DEV Community

DMetaSoul
DMetaSoul

Posted on

Solved a practical business problem when using Hudi: LakeSoul supports null field non-override semanticssemantics

Recently, the LakeSoul r&d team helped users solve a practical business problem using Hudi. Here is a summary and record. The business process is that the upstream system extracts the original data from the online DB table into JSON format and writes it into Kafka. The downstream system uses Spark to read the messages in Kafka. The data is updated and aggregated using Hudi and sent to the downstream database for analysis.
Image description

Some of the data in Kafka is only some of the fields in the original table. The data sample Kafka: {A: A1, C: C4, D: D6, E: E7} {A: A2, B: B4, E: E6} {A: A3, B: B5, C: C5, D: D5, E: E5}. In subsequent data updates, use the latest historical data instead of the missing field value with no updating.
The following figure simplifies the data update process. In the original table, five fields are A, B, C, D, and E. Field A is the primary key, and its type is a string. Spark reads batch data from Kafka and converts it to the format required by Upsert (a DataFrame of a fixed Schema). MOR (Merge on Read) reads new table contents.

Hudi's Merge on Read is currently used to implement this business process, and there is no fixed Schema for the above-misaligned JSON data, which is not supported. Hudi's Merge on Read implementation of the above business process would not have been possible without a fixed schema for the above-misaligned JSON data. An invalid NULL value overwrites the original content if a missing field is filled with a null value. Copy on Write is degraded if Merge Into is used, and the Write performance fails to meet requirements. A workaround approach is to obtain unchanged data from the original table for each data completion. However, this increases resource costs and development workload, inconsistent with users' expectations.

LakeSoul supports custom MergeOperator. Each field can be passed a user-defined MergeOperator when performing Upsert. The parameters are the original value of the field and the new value of Upsert. This is where the Merge results can be determined based on business requirements. The UDF is the same as Spark's native UDF. When Upsert is used, you need to specify a primary key value. Therefore, multiple delta files may have various values for the same primary key and the same field. MergeOperator controls the merging behavior of these values. The default MergeOperator implementation is as follows:

class DefaultMergeOp[T] extends MergeOperator[T] {
  override def mergeData(input: Seq[T]): T = {
    input.last
  }
}
Enter fullscreen mode Exit fullscreen mode

In this scenario, you can define a MergeOperator. For undefined fields, the MergeOperator still fills null values as unique markers (the service guarantees that unique markers do not conflict with normal data). The MergeOperator is ignored during Merge and returns the original values. In this way, when Spark processes JSON data and executes Upsert, null is ignored. The original content is not overwritten, reducing the missing field data through the initial data filling process, significantly improving the execution efficiency, and simplifying the code logic. The code for this custom MergeOperator is as follows:


class MergeNonNullOp[T] extends MergeOperator[T] {
  override def mergeData(input: Seq[T]): T = {
    val output=input.filter(_!=null)
    output.filter(!_.equals("null")).last
  }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, a simple custom implementation of MergeOperator solves an otherwise complex business problem.

The LakeSoul team plans to integrate MergeOperator, which ignores empty fields, into LakeSoul's built-in system, with global options to control whether or not this type of MergeOperator is enabled by default, further improving development efficiency. See Github Issue: https://github.com/meta-soul/LakeSoul/issues/30.

In the future, LakeSoul will also support Merge Into SQL syntax to define Upsert behavior and Merge on Read to improve further the expression of stream batch write updates.
For more information on the LakeSoul Cloud-Native Stream Batch All-in-one surface storage framework, refer to the previous article:
Build a real-time machine learning sample library using the best open-source project about big data and data lakehouse, LakeSoul

Design concept of a best opensource project about big data and data lakehouse

4 best opensource projects about big data you should try out

Top comments (0)