RowKindExtractor is a transformation plugin in Apache SeaTunnel that can convert CDC data streams to Append-Only mode and extract the original RowKind information as a new field. This article introduces the core functionalities of RowKindExtractor, its usage in CDC data synchronization scenarios, configuration options, precautions, and multiple application examples.
RowKindExtractor
The RowKindExtractor transformation plugin is used to convert CDC (Change Data Capture) data streams into Append-Only mode while extracting the original RowKind information into a new field.
Core Functions:
- Convert all RowKind values of data rows to
+I(INSERT), achieving Append-Only mode. - Preserve the original RowKind information (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) in a newly added field.
- Support both SHORT and FULL output formats.
Why is this plugin needed?
In CDC data synchronization scenarios, each data row carries a RowKind flag (+I, -U, +U, -D), representing different types of changes. Some downstream systems (such as data lakes or analytical platforms) only support Append-Only mode and do not support UPDATE or DELETE operations. In such cases, it is necessary to:
- Convert all rows to INSERT type (Append-Only).
- Store the original change type in a regular field for downstream analysis.
Transformation Example:
Input (CDC data):
RowKind: -D (DELETE)
Data: id=1, name="test1", age=20
Output (Append-Only data):
RowKind: +I (INSERT)
Data: id=1, name="test1", age=20, row_kind="DELETE"
Typical Use Cases:
- Writing CDC data into data lakes that only support Append-Only operations.
- Retaining a complete change history in data warehouses.
- Analyzing and aggregating different types of data changes.
Configuration Options
| Parameter Name | Type | Required | Default Value | Description |
|---|---|---|---|---|
| custom_field_name | string | No | row_kind | Name of the new field used to store the original RowKind information |
| transform_type | enum | No | SHORT | Output format of RowKind; possible values: SHORT (short format) or FULL (full format) |
custom_field_name [string]
Specifies the name of the new field used to store the original RowKind information.
Default: row_kind
Notes:
- The field name cannot duplicate an existing field, otherwise an error will occur.
- It is recommended to use meaningful names such as
operation_type,change_type, orcdc_op.
Example:
custom_field_name = "operation_type" # Use a custom field name
transform_type [enum]
Specifies the output format of the RowKind field value.
Options:
| Format | Description | Output Values |
|---|---|---|
| SHORT | Short format (symbolic) | +I, -U, +U, -D |
| FULL | Full format (English name) | INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE |
Default: SHORT
Meaning of each value:
| RowKind Type | SHORT Format | FULL Format | Description |
|---|---|---|---|
| INSERT | +I | INSERT | Insert operation |
| UPDATE_BEFORE | -U | UPDATE_BEFORE | Value before update |
| UPDATE_AFTER | +U | UPDATE_AFTER | Value after update |
| DELETE | -D | DELETE | Delete operation |
Usage Recommendation:
- SHORT format: Saves storage, suitable for space-sensitive scenarios.
- FULL format: More readable, suitable for human inspection or analysis.
Example:
transform_type = FULL # Use full format
Full Examples
Example 1: Using Default Configuration (SHORT Format)
Using default configuration, CDC data is converted to Append-Only mode, with RowKind saved in short format.
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
MySQL-CDC {
plugin_output = "cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.users"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}
transform {
RowKindExtractor {
plugin_input = "cdc_source"
plugin_output = "append_only_data"
# Default configuration:
# custom_field_name = "row_kind"
# transform_type = SHORT
}
}
sink {
Console {
plugin_input = "append_only_data"
}
}
Data Transformation Process:
Input Data (CDC Format):
1. RowKind=+I, id=1, name="Zhang San", age=25
2. RowKind=-U, id=1, name="Zhang San", age=25
3. RowKind=+U, id=1, name="Zhang San", age=26
4. RowKind=-D, id=1, name="Zhang San", age=26
Output Data (Append-Only Format):
1. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="+I"
2. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="-U"
3. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="+U"
4. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="-D"
Example 2: Using FULL Format with Custom Field Name
Output RowKind in full format and use a custom field name.
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
MySQL-CDC {
plugin_output = "cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.orders"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}
transform {
RowKindExtractor {
plugin_input = "cdc_source"
plugin_output = "append_only_data"
custom_field_name = "operation_type" # Custom field name
transform_type = FULL # Use full format
}
}
sink {
Iceberg {
plugin_input = "append_only_data"
catalog_name = "iceberg_catalog"
database = "mydb"
table = "orders_history"
# The Iceberg table will include the operation_type field to record the change type of each row
}
}
Data Transformation Process:
Input Data (CDC Format):
1. RowKind=+I, order_id=1001, amount=100.00
2. RowKind=-U, order_id=1001, amount=100.00
3. RowKind=+U, order_id=1001, amount=150.00
4. RowKind=-D, order_id=1001, amount=150.00
Output Data (Append-Only Format, FULL):
1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
Example 3: Full Test Using FakeSource
Generate test data using FakeSource to demonstrate transformation of various RowKind values.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "fake_cdc_data"
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{ kind = INSERT, fields = [1, "A", 100] },
{ kind = INSERT, fields = [2, "B", 100] },
{ kind = UPDATE_BEFORE, fields = [1, "A", 100] },
{ kind = UPDATE_AFTER, fields = [1, "A_updated", 95] },
{ kind = UPDATE_BEFORE, fields = [2, "B", 100] },
{ kind = UPDATE_AFTER, fields = [2, "B_updated", 98] },
{ kind = DELETE, fields = [1, "A_updated", 95] }
]
}
}
transform {
RowKindExtractor {
plugin_input = "fake_cdc_data"
plugin_output = "transformed_data"
custom_field_name = "change_type"
transform_type = FULL
}
}
sink {
Console {
plugin_input = "transformed_data"
}
}
Expected Output:
+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"

Top comments (0)