Real-Time Incremental Data Capture: Change Data Capture (CDC)
Official MySQL-CDC Documentation:
https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/MySQL-CDC/A single SeaTunnel CDC job can monitor multiple tables simultaneously and synchronize them in real time.
-
You must use MySQL JDBC Driver 8.0.33 or later.
1. How Many Databases Does SeaTunnel Support for CDC?
Documentation:
https://seatunnel.apache.org/docs/2.3.12/connector-v2/source
Official CDC Source Connectors Supported by SeaTunnel:
- MongoDB CDC Source Connector
- MySQL CDC Source Connector
- OpenGauss CDC Source Connector
- Oracle CDC Source Connector
- PostgreSQL CDC Source Connector
- SQL Server CDC Source Connector
- TiDB CDC Source Connector
2. CDC Jobs Do Not Stop After SeaTunnel Starts
A standard CDC (Change Data Capture) job is designed to run continuously and indefinitely as a streaming service.
Core Difference
-
Offline / Batch Processing Jobs
job.mode = "BATCH"- Execute a full or incremental SQL query once.
- The job automatically finishes after processing all existing data.
-
CDC / Streaming Jobs
job.mode = "STREAMING"- After startup, the job will:
1. Optionally perform an **initial full snapshot** (when `scan.startup.mode = "initial"` is configured).
2. Suspend and continuously monitor the MySQL Binlog stream.
3. Immediately capture, process, and write any new database changes (INSERT, UPDATE, DELETE).
4. Continue monitoring indefinitely until the job is manually stopped or fails due to an error.
How to Stop a CDC Job
In a SeaTunnel terminal session, you can usually stop a CDC job gracefully using:
Ctrl + C
In production environments, jobs are typically stopped through schedulers, orchestration systems, or management platforms.
In simple terms, a CDC job behaves like a persistent subscription service that subscribes to database change logs and processes events in real time whenever they occur.
This is fundamentally different from a traditional batch job that runs once and exits.
π‘ Configuration Recommendations and Common Pitfalls
- About
startup.mode
For first-time deployments, it is strongly recommended to use:
initial
This ensures:
- A complete snapshot of current data is captured first.
- Incremental synchronization seamlessly follows afterward.
If you configure:
latest
all historical data will be skipped and only changes occurring after startup will be captured.
- About
server-id
In production environments, always configure a unique and explicit value.
Avoid random values because conflicts can cause instability and unexpected behavior.
3. CDC Mode vs JDBC Mode
Comparison
| Feature | MySQL CDC (Streaming) | JDBC Batch Processing |
|---|---|---|
| Job Mode | STREAMING |
BATCH |
| Data Source | Database Binlog | SQL Query Result Set |
| Data Content | Change Event Stream (includes operation type, before/after images, metadata) | Static Data Snapshot |
| Core Configuration | table-names |
query or table + sql
|
| Synchronization Type | Real-time incremental (optionally full snapshot first) | One-time full or incremental batch |
| Job Lifecycle | Runs continuously until manually stopped | Automatically ends after reading all data |
| Typical Use Cases | Real-time warehouses, analytics, disaster recovery | T+1 reports, migrations, historical backfill |
| Source Database Load | Low continuous Binlog reading | Query-intensive batch execution |
| Data Consistency | Exactly-once guarantees | Depends on query conditions; duplicates or omissions may occur |
Architecture Comparison
Under CDC mode, you cannot use a custom SQL query in the source connector like you would with JDBC.
For example:
SELECT * FROM table WHERE age > 20
is not supported in the CDC Source.
If filtering or transformation is required, it must be implemented in the Transform stage.
Why CDC Does Not Support Custom Queries
- Different Data Sources
JDBC Batch
- Reads the results of user-defined SQL statements.
MySQL CDC
- Reads database Binlog events.
- Internally uses Debezium.
- Behaves like a MySQL replica that continuously receives row-level changes.
- Different Data Structures
JDBC Batch
Returns ordinary row data.
MySQL CDC
Produces structured change events containing:
- Changed row values
- Operation types (
op) - Metadata
- Before image
- After image
Examples:
+I Insert
-U Before Update
+U After Update
-D Delete
A traditional SQL query cannot generate this event structure.
4. How to Implement Filtering and Transformation in CDC Mode
Although filtering cannot be performed in the Source, several alternatives are available.
| Method | Location | Description |
|---|---|---|
| Transform | SeaTunnel Transform Section | Recommended approach. Supports filtering, projection, renaming, and SQL processing. |
| Sink Processing | Sink Configuration | Some sinks support limited filtering or mapping capabilities. |
| Schema Evolution | Table Synchronization Layer | Sync selected columns to achieve indirect column filtering; requires advance schema changes and is typically used for long-term sync scenarios. |
5. CDC Can Monitor Multiple Tables Simultaneously
When multiple tables need to be synchronized using a single CDC job, SeaTunnel provides a simple configuration model.
The key idea is:
- One Source β Many Tables
- Automatic Sink Routing
Configuring Multi-Table CDC
Simply list all tables inside:
table-names
Example:
source {
MySQL-CDC {
base-url = "jdbc:mysql://192.168.1.107:51382/cs1"
username = "root"
password = "zysoft"
database-names = ["cs1"]
table-names = [
"cs1.t_8_100w",
"cs1.order_table",
"cs1.user_profile"
]
startup.mode = "initial"
server-id = 5400
server-time-zone = "Asia/Shanghai"
}
}
Sink Configuration: Automatic Routing
SeaTunnel JDBC Sink supports automatic table routing.
- Core Configuration
sink {
jdbc {
url = "jdbc:mysql://192.168.1.107:51382/cs2"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "zysoft"
generate_sink_sql = true
database = "cs2" # Target database
# Key technique: dynamically map source table names using built-in variables
table = "${table_name}"
# table = "prefix_${table_name}" # Add a prefix if needed
# table = "${database_name}_${table_name}_suffix" # Use database name and suffix
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
batch_size = 5000
# ... Other connection and performance tuning parameters remain unchanged
}
}
Built-In Variables
SeaTunnel automatically provides:
${table_name}
${database_name}
During execution:
cs1.t_8_100w
->
cs2.t_8_100w
cs1.order_table
->
cs2.order_table
This creates a natural 1:1 table mapping.
- Different Routing Strategies for Different Tables
If different tables require different write policies, configure multiple Sink definitions.
Example:
sink {
# Sink 1: Dedicated to processing the t_8_100w table
jdbc {
# Explicitly define the target table
table = "t_8_100w_target"
# Route only data from the specified source table to this sink
source_table_name = "t_8_100w"
# ... Other configurations
}
}
sink {
# Sink 2: Dedicated to processing the order_table table
# Different write strategies (such as data_save_mode) can be applied
jdbc {
table = "order_table_target"
source_table_name = "order_table"
# Example: Use a truncate-and-reload strategy for this table
data_save_mode = "DROP_DATA"
# ... Other configurations
}
}
-
Reminding and Best Practices
- Target Tables
Ensure compatible target tables already exist, or enable
schema_save_mode = βCREATE_SCHEMA_WHEN_NOT_EXIST - Performance Isolation All monitored tables share the same CDC stream. If one table generates massive changes, it may increase latency for others. For critical workloads, create separate CDC jobs.
- Initial Snapshot
When using
startup.mode = "initial", all monitored tables are snapshotted during startup. Ensure sufficient database resources are available.
- Target Tables
Ensure compatible target tables already exist, or enable
Summary:
For most multi-table CDC scenarios:
- List multiple tables in
table-names - Configure:
table = "${table_name}"
in the Sink
SeaTunnel will automatically route data to matching target tables.
If special processing is required for specific tables, use:
- Transform SQL
- Filters
- Conditional routing
to customize behavior.
- Routing and Filters If you want data to flow selectively into different sinks instead of being copied to every sink, you must configure a filter before each sink or use more advanced mechanisms such as side outputs (typically implemented through conditional expressions in the configuration file).
π‘ Configuration Recommendations for Multi-Table CDC Scenarios
Based on the questions discussed earlier, a typical configuration for synchronizing multiple CDC tables into different target tables is shown below:
source {
MySQL-CDC {
table-names = ["cs1.t_order", "cs1.t_user", "cs1.t_log"]
# ... other configurations
}
}
sink {
# Order table -> Order Archive Database
jdbc {
table = "t_order_archive"
# Use filter to synchronize only t_order data
filter {
source_table_name == "t_order"
}
# ...
}
# User table -> User Analytics Database
jdbc {
table = "t_user_analysis"
filter {
source_table_name == "t_user"
}
data_save_mode = "OVERWRITE" # Use overwrite strategy for this table
# ...
}
# Log table -> Log Center (example writes to HDFS)
hdfs {
path = "/data/lake/log/${source_table_name}/dt=${now(date='yyyy-MM-dd')}"
filter {
source_table_name == "t_log"
}
# ...
}
}
Steps to Implement MySQL CDC
1. Verify That MySQL Binlog Is Enabled
Use the following commands to check whether Binlog is enabled.
-- The simplest way to check
SHOW VARIABLES LIKE 'log_bin';
-- Shows current binlog file name and location
SHOW MASTER STATUS;
-- Verify binlog format
SHOW VARIABLES LIKE 'binlog_format';
-- Verify row image mode
SHOW VARIABLES LIKE 'binlog_row_image';
Enable Binlog If It Is Disabled
If Binlog is not enabled, modify the MySQL configuration file and restart MySQL.
[mysqld]
server-id = 123
# Configure a unique server ID
log_bin = /var/lib/mysql/mysql-bin
# Enable Binlog and specify storage path
binlog_format = ROW
# Must be configured as ROW mode
binlog_row_image = FULL
# Must be configured as FULL
expire_logs_days = 10
# Binlog retention period
# At least 2 days is recommended
2. Common MySQL CDC Parameters
Single-Table MySQL CDC Demo
- Create Target Table
-- demo7-1-mysql-cdc2mysql-qxzh-st-107.conf
CREATE TABLE cs2.t_8_100w_imp_st_qxzh_cdc_demo7_1 (
id BIGINT NOT NULL COMMENT 'Primary Key',
user_name VARCHAR(2000) NULL COMMENT 'User Name',
sex VARCHAR(20) NULL COMMENT 'Gender: Male/Female',
decimal_f DECIMAL(32,6) NULL COMMENT 'Large Decimal Value',
phone_number VARCHAR(20) COMMENT 'Phone Number',
age INT NULL COMMENT 'Converted Age',
create_time TIMESTAMP COMMENT 'Creation Time',
description LONGTEXT NULL COMMENT 'Large Text',
address VARCHAR(2000) NULL COMMENT 'Default Value for Empty Address: Unknown',
PRIMARY KEY (id)
);
- Execute the Job
# demo7-1-mysql-cdc2mysql-qxzh-st-107.conf
sh /data/tools/seatunnel/seatunnel-2.3.12/bin/seatunnel.sh \
--config /data/tools/seatunnel/myconf/demo7-1-mysql-cdc2mysql-qxzh-st-107.conf \
-m local
- Complete Configuration File
# demo7-1-mysql-cdc2mysql-qxzh-st-107.conf
env {
# Parallelism (number of threads)
execution.parallelism = 5
# Job mode:
# BATCH = Batch Processing
# STREAMING = Streaming Processing (required for CDC)
job.mode = "STREAMING"
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://ip:port/cs1"
username = "root"
password = "zysoft"
# query is invalid in CDC mode
# Source database
database-names = ["cs1"]
# Monitored tables
# Table names must include database names
table-names = ["cs1.t_8_100w"]
# Startup mode:
# initial = full snapshot + incremental changes
# latest = incremental changes only
startup.mode = initial
# Startup timestamp
# Required when startup.mode=timestamp
# startup.timestamp
# Very important!
# CDC client unique ID
# Example: 5400 or range 5400-6408
# Must not conflict with existing MySQL server IDs
server-id = 5400
# Stop mode
# stop.mode
# Required when stop.mode=timestamp
# stop.timestamp
# Database session time zone
server-time-zone = "Asia/Shanghai"
}
}
# CDC transformations must be implemented in Transform
transform {
# 1. Field Mapping
# Alternatively, use the FieldMapper plugin
FieldMapper {
field_mapper = {
id = id
name = user_name
sex = sex
decimal_f = decimal_f
phone_number = phone_number
age = age
create_time = create_time
description = description
}
}
# 2. Phone number masking
# 13812341234 -> 138****1234
# 3. Age conversion
# String -> Integer
# 4. Gender conversion
# 1 -> Male
# 2 -> Female
# 5. Data filtering
# Keep only records where age > 25
# 6. Address default value
# Empty address -> "Unknown"
}
sink {
jdbc {
url = "jdbc:mysql://ip:port/cs2"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "zysoft"
# Automatically generate insert SQL
# Can also create tables automatically
generate_sink_sql = true
# Required when generate_sink_sql=true
database = cs2
table = "t_8_100w_imp_st_qxzh_cdc_demo7_1"
# Fail if schema does not exist
# Commonly used:
# CREATE_SCHEMA_WHEN_NOT_EXIST
schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"
# APPEND_DATA
# Keep existing data and append new records
data_save_mode = "APPEND_DATA"
# DROP_DATA
# Clear table before loading
# data_save_mode = "DROP_DATA"
batch_size = 5000
# Retry attempts
max_retries = 3
# Connection timeout
connection_check_timeout_sec = 300
properties = {
useUnicode = true
characterEncoding = "utf8"
serverTimezone = "Asia/Shanghai"
# Enable batch rewrite
rewriteBatchedStatements = "true"
# Enable compression
useCompression = "true"
# Disable server-side prepared statements
useServerPrepStmts = "false"
}
}
}
- Execution Result
After startup, the CDC job first performs the initial snapshot synchronization.
In this example:
- Total records read: 1,000,009
- Total records written: 1,000,009
- Job status: RUNNING
- Checkpoints continue to be generated successfully.
This indicates that:
- The initial full snapshot has completed successfully.
- Source and target data are consistent.
- Checkpointing is functioning correctly.
- The CDC job has entered continuous Binlog monitoring mode.
- The streaming pipeline remains active and ready to capture new changes in real time.
Result Demonstration
Once the initial synchronization is complete:
- INSERT operations are captured immediately.
- UPDATE operations are synchronized in real time.
- DELETE operations are propagated automatically.
- The CDC job continues running until manually stopped.
At this point, the pipeline behaves like a long-running subscription service, continuously consuming MySQL Binlog events and delivering them to downstream systems in real time.






Top comments (0)