1. Overview
In modern data architectures, real-time capture and processing of data changes is a key technology for building data lakes, real-time data warehouses, and business analytics systems. By reading database transaction logs (such as MySQL Binlog), Apache SeaTunnel can efficiently and accurately capture table change events, including INSERT, UPDATE, and DELETE operations.
Apache SeaTunnel natively supports extracting the row_kind metadata column, which records the change type (signal) of each captured record, such as +I (INSERT), -U (UPDATE_BEFORE), +U (UPDATE_AFTER), and -D (DELETE). This enables users to perform more fine-grained control over change streams, such as filtering specific change events through the row_kind field (for example, synchronizing only newly inserted data), thereby building efficient and customized real-time data pipelines.
This technology is widely used in scenarios such as append-only data lake ingestion, preserving complete change histories for downstream analytical systems, and implementing fine-grained filtering logic in streaming ETL processes.
2. Environment Setup
Before starting the demo, prepare the following environment and components:
- JDK 11
- Apache SeaTunnel 2.3.12
- MySQL 5.7
3. SeaTunnel Configuration
1. Preparing SeaTunnel Connector Plugins
First, ensure that your SeaTunnel environment can connect to MySQL.
Edit the config/plugin_config file and add the following two core connectors:
id="l4j1ph"
connector-cdc-mysql
connector-jdbc
After saving the file, execute the installation script:
id="0ksx1w"
sh bin/install-plugin.sh
If online installation is slow or unavailable, you can manually download the corresponding JAR packages from the Maven repository and place them into the connectors directory.
2. Adding the MySQL Driver
Since the MySQL JDBC driver is usually not bundled by default, it must be downloaded manually. Place mysql-connector-java-8.0.28.jar (or your preferred version) into the lib directory of SeaTunnel.
4. Creating MySQL Tables
id="2h22u9"
CREATE TABLE `w` (
`id` int(11) NOT NULL,
`name` varchar(50) CHARACTER SET utf8mb4 NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;
CREATE TABLE `w2` (
`id` int(11) NOT NULL,
`name` varchar(50) CHARACTER SET utf8mb4 NULL DEFAULT NULL,
`row_kind` varchar(10) CHARACTER SET utf8mb4 NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;
Note: Do not set id as the primary key in table w2; otherwise, records will be updated based on the primary key instead of being inserted as new rows.
5. SeaTunnel Job Definition
id="xv3hza"
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
MySQL-CDC {
server-id = 5000
username = "root"
password = "root"
table-names = ["cdc.w"]
url = "jdbc:mysql://localhost:3306/cdc"
}
}
transform {
RowKindExtractor {
}
}
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/cdc?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
username = "root"
password = "root"
database = cdc
table = w2
generate_sink_sql = true
}
}
Key Notes
RowKindExtractoradds arow_kindflag to each data row, enabling Append-Only mode.The
row_kindfield name can be customized:
id="tr2caj"
custom_field_name = "op_type"
- Data types support both abbreviated and full formats. The abbreviated format is used by default:
id="p68n79"
transform_type = SHORT # FULL
Execute the job:
id="mq0owv"
bin/seatunnel.sh -c job/filename -m local
After execution, all data changes in table w will be synchronized to table w2.
6. Testing
1. Insert Data
insert into w values(1,'Alice');
insert into w values(2,'Bob');
mysql> select * from w2;
+----+-------+----------+
| id | name | row_kind |
+----+-------+----------+
| 1 | Alice | +I |
| 2 | Bob | +I |
+----+-------+----------+
2. Update and Delete Data
id="7rrgg3"
update w set name='Charlie' where id=2;
delete from w where id=2;
mysql> select * from w2;
+----+-------- +----------+
| id | name | row_kind |
+----+-------- +----------+
| 1 | Alice | +I |
| 2 | Bob | +I |
| 2 | Charlie | +U |
| 2 | Charlie | -D |
+----+--------+----------+
Conclusion: All changes are synchronized downstream in the form of inserted records.
7. Implementing Change Filtering Through Metadata
Using the row_kind metadata field, selective synchronization can easily be implemented within the data pipeline. For example, if only newly inserted records from source table w need to be synchronized to downstream table w2, a WHERE condition can be added in the SQL query to filter the row_kind field.
The core principle lies in row-level change event markers:
For UPDATE operations, two consecutive events are generated:
-
-U(UPDATE_BEFORE), representing the old value -
+U(UPDATE_AFTER), representing the new value
DELETE operations generate the -D event.
By filtering row_kind = '+I', only INSERT events are captured and forwarded downstream, while UPDATE and DELETE events are ignored. This enables business scenarios such as source-stream snapshots and append-only data ingestion.
Technical Implementation Example
id="h7c5lc"
transform {
RowKindExtractor {
plugin_input = "mysql_source"
plugin_output = "trans_row"
}
Sql {
plugin_input = "trans_row"
plugin_output = "trans_sql"
query = "select id,name from trans_row where row_kind = '+I'";
}
}
After adding the change marker field, SQL filtering can be used to retain only newly inserted data and write it to the downstream table w2 in real time.
UPDATE and DELETE events are filtered out and will not be transmitted downstream.
8. Test Verification and Result Analysis
To verify the effectiveness of the row_kind filtering logic, we performed a series of operations on the source table w and observed the changes in the target table w2.
In this scenario, table w2 no longer requires the row_kind field.
Test Steps and Observations
1. Insert Data
id="8e4l5x"
INSERT INTO w VALUES(1,'Alice');
INSERT INTO w VALUES(2,'Bob');
mysql> select * from w2;
+----+--------+----------+
| id | name | row_kind |
+----+--------+----------+
| 1 | Alice | +I |
| 2 | Bob | +I |
+----+--------+----------+
2. Update Data
id="7cllq7"
UPDATE w SET name='Charlie' WHERE id=2;
DELETE FROM w WHERE id=2;
No changes will appear in table w2.
Top comments (0)