DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

Selective CDC with Apache SeaTunnel: How to Capture Only the Database Changes You Need

1. Overview

In modern data architectures, real-time capture and processing of data changes is a key technology for building data lakes, real-time data warehouses, and business analytics systems. By reading database transaction logs (such as MySQL Binlog), Apache SeaTunnel can efficiently and accurately capture table change events, including INSERT, UPDATE, and DELETE operations.

Apache SeaTunnel natively supports extracting the row_kind metadata column, which records the change type (signal) of each captured record, such as +I (INSERT), -U (UPDATE_BEFORE), +U (UPDATE_AFTER), and -D (DELETE). This enables users to perform more fine-grained control over change streams, such as filtering specific change events through the row_kind field (for example, synchronizing only newly inserted data), thereby building efficient and customized real-time data pipelines.

This technology is widely used in scenarios such as append-only data lake ingestion, preserving complete change histories for downstream analytical systems, and implementing fine-grained filtering logic in streaming ETL processes.

2. Environment Setup

Before starting the demo, prepare the following environment and components:

  • JDK 11
  • Apache SeaTunnel 2.3.12
  • MySQL 5.7

3. SeaTunnel Configuration

1. Preparing SeaTunnel Connector Plugins

First, ensure that your SeaTunnel environment can connect to MySQL.

Edit the config/plugin_config file and add the following two core connectors:

id="l4j1ph"
connector-cdc-mysql
connector-jdbc
Enter fullscreen mode Exit fullscreen mode

After saving the file, execute the installation script:

id="0ksx1w"
sh bin/install-plugin.sh
Enter fullscreen mode Exit fullscreen mode

If online installation is slow or unavailable, you can manually download the corresponding JAR packages from the Maven repository and place them into the connectors directory.

2. Adding the MySQL Driver

Since the MySQL JDBC driver is usually not bundled by default, it must be downloaded manually. Place mysql-connector-java-8.0.28.jar (or your preferred version) into the lib directory of SeaTunnel.

4. Creating MySQL Tables

id="2h22u9"
CREATE TABLE `w`  (
  `id` int(11) NOT NULL,
  `name` varchar(50) CHARACTER SET utf8mb4 NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;

CREATE TABLE `w2`  (
  `id` int(11) NOT NULL,
  `name` varchar(50) CHARACTER SET utf8mb4 NULL DEFAULT NULL,
  `row_kind` varchar(10) CHARACTER SET utf8mb4 NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;
Enter fullscreen mode Exit fullscreen mode

Note: Do not set id as the primary key in table w2; otherwise, records will be updated based on the primary key instead of being inserted as new rows.

5. SeaTunnel Job Definition

id="xv3hza"
env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    server-id = 5000
    username = "root"
    password = "root"
    table-names = ["cdc.w"]
    url = "jdbc:mysql://localhost:3306/cdc"
  }
}

transform {
  RowKindExtractor {

  }
}

sink {
  jdbc {
        url = "jdbc:mysql://localhost:3306/cdc?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        username = "root"
        password = "root"
        database = cdc
        table = w2
        generate_sink_sql = true
  }
}
Enter fullscreen mode Exit fullscreen mode

Key Notes

  1. RowKindExtractor adds a row_kind flag to each data row, enabling Append-Only mode.

  2. The row_kind field name can be customized:

id="tr2caj"
custom_field_name = "op_type"
Enter fullscreen mode Exit fullscreen mode
  1. Data types support both abbreviated and full formats. The abbreviated format is used by default:
id="p68n79"
transform_type = SHORT     # FULL
Enter fullscreen mode Exit fullscreen mode

Execute the job:

id="mq0owv"
bin/seatunnel.sh -c job/filename -m local
Enter fullscreen mode Exit fullscreen mode

After execution, all data changes in table w will be synchronized to table w2.

6. Testing

1. Insert Data

insert into w values(1,'Alice');
insert into w values(2,'Bob');

mysql> select * from w2;

+----+-------+----------+
| id | name  | row_kind |
+----+-------+----------+
|  1 | Alice | +I       |
|  2 | Bob   | +I       |
+----+-------+----------+
Enter fullscreen mode Exit fullscreen mode

2. Update and Delete Data

id="7rrgg3"
update w set name='Charlie' where id=2;
delete from w where id=2;

mysql> select * from w2;

+----+-------- +----------+
| id | name    | row_kind |
+----+-------- +----------+
|  1 | Alice   | +I       |
|  2 | Bob     | +I       |
|  2 | Charlie | +U       |
|  2 | Charlie | -D       |
+----+--------+----------+
Enter fullscreen mode Exit fullscreen mode

Conclusion: All changes are synchronized downstream in the form of inserted records.

7. Implementing Change Filtering Through Metadata

Using the row_kind metadata field, selective synchronization can easily be implemented within the data pipeline. For example, if only newly inserted records from source table w need to be synchronized to downstream table w2, a WHERE condition can be added in the SQL query to filter the row_kind field.

The core principle lies in row-level change event markers:

For UPDATE operations, two consecutive events are generated:

  • -U (UPDATE_BEFORE), representing the old value
  • +U (UPDATE_AFTER), representing the new value

DELETE operations generate the -D event.

By filtering row_kind = '+I', only INSERT events are captured and forwarded downstream, while UPDATE and DELETE events are ignored. This enables business scenarios such as source-stream snapshots and append-only data ingestion.

Technical Implementation Example

id="h7c5lc"
transform {
  RowKindExtractor {
    plugin_input = "mysql_source"
    plugin_output = "trans_row"
  }

  Sql {
    plugin_input = "trans_row"
    plugin_output = "trans_sql"
    query = "select id,name from trans_row where row_kind = '+I'";
  }
}
Enter fullscreen mode Exit fullscreen mode

After adding the change marker field, SQL filtering can be used to retain only newly inserted data and write it to the downstream table w2 in real time.

UPDATE and DELETE events are filtered out and will not be transmitted downstream.

8. Test Verification and Result Analysis

To verify the effectiveness of the row_kind filtering logic, we performed a series of operations on the source table w and observed the changes in the target table w2.

In this scenario, table w2 no longer requires the row_kind field.

Test Steps and Observations

1. Insert Data

id="8e4l5x"
INSERT INTO w VALUES(1,'Alice');
INSERT INTO w VALUES(2,'Bob');

mysql> select * from w2;

+----+--------+----------+
| id | name   | row_kind |
+----+--------+----------+
|  1 | Alice  | +I       |
|  2 | Bob    | +I       |
+----+--------+----------+
Enter fullscreen mode Exit fullscreen mode

2. Update Data

id="7cllq7"
UPDATE w SET name='Charlie' WHERE id=2;
DELETE FROM w WHERE id=2;
Enter fullscreen mode Exit fullscreen mode

No changes will appear in table w2.

Top comments (0)