DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

How Does Apache SeaTunnel Convert CDC Streams to Append-Only Mode?

RowKindExtractor is a transformation plugin in Apache SeaTunnel that can convert CDC data streams to Append-Only mode and extract the original RowKind information as a new field. This article introduces the core functionalities of RowKindExtractor, its usage in CDC data synchronization scenarios, configuration options, precautions, and multiple application examples.

RowKindExtractor

The RowKindExtractor transformation plugin is used to convert CDC (Change Data Capture) data streams into Append-Only mode while extracting the original RowKind information into a new field.

Core Functions:

  • Convert all RowKind values of data rows to +I (INSERT), achieving Append-Only mode.
  • Preserve the original RowKind information (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) in a newly added field.
  • Support both SHORT and FULL output formats.

Why is this plugin needed?

In CDC data synchronization scenarios, each data row carries a RowKind flag (+I, -U, +U, -D), representing different types of changes. Some downstream systems (such as data lakes or analytical platforms) only support Append-Only mode and do not support UPDATE or DELETE operations. In such cases, it is necessary to:

  1. Convert all rows to INSERT type (Append-Only).
  2. Store the original change type in a regular field for downstream analysis.

Transformation Example:

Input (CDC data):
 RowKind: -D (DELETE)
 Data: id=1, name="test1", age=20

Output (Append-Only data):
 RowKind: +I (INSERT)
 Data: id=1, name="test1", age=20, row_kind="DELETE"
Enter fullscreen mode Exit fullscreen mode

Typical Use Cases:

  • Writing CDC data into data lakes that only support Append-Only operations.
  • Retaining a complete change history in data warehouses.
  • Analyzing and aggregating different types of data changes.

Configuration Options

Parameter Name Type Required Default Value Description
custom_field_name string No row_kind Name of the new field used to store the original RowKind information
transform_type enum No SHORT Output format of RowKind; possible values: SHORT (short format) or FULL (full format)

custom_field_name [string]

Specifies the name of the new field used to store the original RowKind information.

Default: row_kind

Notes:

  • The field name cannot duplicate an existing field, otherwise an error will occur.
  • It is recommended to use meaningful names such as operation_type, change_type, or cdc_op.

Example:

custom_field_name = "operation_type"  # Use a custom field name
Enter fullscreen mode Exit fullscreen mode

transform_type [enum]

Specifies the output format of the RowKind field value.

Options:

Format Description Output Values
SHORT Short format (symbolic) +I, -U, +U, -D
FULL Full format (English name) INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE

Default: SHORT

Meaning of each value

RowKind Type SHORT Format FULL Format Description
INSERT +I INSERT Insert operation
UPDATE_BEFORE -U UPDATE_BEFORE Value before update
UPDATE_AFTER +U UPDATE_AFTER Value after update
DELETE -D DELETE Delete operation

Usage Recommendation:

  • SHORT format: Saves storage, suitable for space-sensitive scenarios.
  • FULL format: More readable, suitable for human inspection or analysis.

Example:

transform_type = FULL  # Use full format
Enter fullscreen mode Exit fullscreen mode

Full Examples

Example 1: Using Default Configuration (SHORT Format)

Using default configuration, CDC data is converted to Append-Only mode, with RowKind saved in short format.

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # Default configuration:
    # custom_field_name = "row_kind"
    # transform_type = SHORT
  }
}

sink {
  Console {
    plugin_input = "append_only_data"
  }
}
Enter fullscreen mode Exit fullscreen mode

Data Transformation Process:

Input Data (CDC Format):
 1. RowKind=+I, id=1, name="Zhang San", age=25
 2. RowKind=-U, id=1, name="Zhang San", age=25
 3. RowKind=+U, id=1, name="Zhang San", age=26
 4. RowKind=-D, id=1, name="Zhang San", age=26

Output Data (Append-Only Format):
 1. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="+I"
 2. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="-U"
 3. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="+U"
 4. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="-D"
Enter fullscreen mode Exit fullscreen mode

Example 2: Using FULL Format with Custom Field Name

Output RowKind in full format and use a custom field name.

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # Custom field name
    transform_type = FULL                 # Use full format
  }
}

sink {
  Iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # The Iceberg table will include the operation_type field to record the change type of each row
  }
}
Enter fullscreen mode Exit fullscreen mode

Data Transformation Process:

Input Data (CDC Format):
 1. RowKind=+I, order_id=1001, amount=100.00
 2. RowKind=-U, order_id=1001, amount=100.00
 3. RowKind=+U, order_id=1001, amount=150.00
 4. RowKind=-D, order_id=1001, amount=150.00

Output Data (Append-Only Format, FULL):
 1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
 2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
 3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
 4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
Enter fullscreen mode Exit fullscreen mode

Example 3: Full Test Using FakeSource

Generate test data using FakeSource to demonstrate transformation of various RowKind values.

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      { kind = INSERT, fields = [1, "A", 100] },
      { kind = INSERT, fields = [2, "B", 100] },
      { kind = UPDATE_BEFORE, fields = [1, "A", 100] },
      { kind = UPDATE_AFTER, fields = [1, "A_updated", 95] },
      { kind = UPDATE_BEFORE, fields = [2, "B", 100] },
      { kind = UPDATE_AFTER, fields = [2, "B_updated", 98] },
      { kind = DELETE, fields = [1, "A_updated", 95] }
    ]
  }
}

transform {
  RowKindExtractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = FULL
  }
}

sink {
  Console {
    plugin_input = "transformed_data"
  }
}
Enter fullscreen mode Exit fullscreen mode

Expected Output:

+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
Enter fullscreen mode Exit fullscreen mode

Top comments (0)