DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

From Batch to Real-Time: A Practical Guide to MySQL CDC in SeaTunnel

Real-Time Incremental Data Capture: Change Data Capture (CDC)

1. How Many Databases Does SeaTunnel Support for CDC?

Documentation:

https://seatunnel.apache.org/docs/2.3.12/connector-v2/source

Official CDC Source Connectors Supported by SeaTunnel:

  1. MongoDB CDC Source Connector
  2. MySQL CDC Source Connector
  3. OpenGauss CDC Source Connector
  4. Oracle CDC Source Connector
  5. PostgreSQL CDC Source Connector
  6. SQL Server CDC Source Connector
  7. TiDB CDC Source Connector

2. CDC Jobs Do Not Stop After SeaTunnel Starts

A standard CDC (Change Data Capture) job is designed to run continuously and indefinitely as a streaming service.

Core Difference

  • Offline / Batch Processing Jobs

    • job.mode = "BATCH"
    • Execute a full or incremental SQL query once.
    • The job automatically finishes after processing all existing data.
  • CDC / Streaming Jobs

    • job.mode = "STREAMING"
    • After startup, the job will:
1. Optionally perform an **initial full snapshot** (when `scan.startup.mode = "initial"` is configured).
2. Suspend and continuously monitor the MySQL Binlog stream.
3. Immediately capture, process, and write any new database changes (INSERT, UPDATE, DELETE).
4. Continue monitoring indefinitely until the job is manually stopped or fails due to an error.
Enter fullscreen mode Exit fullscreen mode

How to Stop a CDC Job

In a SeaTunnel terminal session, you can usually stop a CDC job gracefully using:

Ctrl + C
Enter fullscreen mode Exit fullscreen mode

In production environments, jobs are typically stopped through schedulers, orchestration systems, or management platforms.

In simple terms, a CDC job behaves like a persistent subscription service that subscribes to database change logs and processes events in real time whenever they occur.

This is fundamentally different from a traditional batch job that runs once and exits.

πŸ’‘ Configuration Recommendations and Common Pitfalls

  1. About startup.mode

For first-time deployments, it is strongly recommended to use:

initial
Enter fullscreen mode Exit fullscreen mode

This ensures:

  • A complete snapshot of current data is captured first.
  • Incremental synchronization seamlessly follows afterward.

If you configure:

latest
Enter fullscreen mode Exit fullscreen mode

all historical data will be skipped and only changes occurring after startup will be captured.

  1. About server-id

In production environments, always configure a unique and explicit value.

Avoid random values because conflicts can cause instability and unexpected behavior.

3. CDC Mode vs JDBC Mode

Comparison

Feature MySQL CDC (Streaming) JDBC Batch Processing
Job Mode STREAMING BATCH
Data Source Database Binlog SQL Query Result Set
Data Content Change Event Stream (includes operation type, before/after images, metadata) Static Data Snapshot
Core Configuration table-names query or table + sql
Synchronization Type Real-time incremental (optionally full snapshot first) One-time full or incremental batch
Job Lifecycle Runs continuously until manually stopped Automatically ends after reading all data
Typical Use Cases Real-time warehouses, analytics, disaster recovery T+1 reports, migrations, historical backfill
Source Database Load Low continuous Binlog reading Query-intensive batch execution
Data Consistency Exactly-once guarantees Depends on query conditions; duplicates or omissions may occur

Architecture Comparison

Under CDC mode, you cannot use a custom SQL query in the source connector like you would with JDBC.

For example:

SELECT * FROM table WHERE age > 20
Enter fullscreen mode Exit fullscreen mode

is not supported in the CDC Source.

If filtering or transformation is required, it must be implemented in the Transform stage.

Why CDC Does Not Support Custom Queries

  1. Different Data Sources

JDBC Batch

  • Reads the results of user-defined SQL statements.

MySQL CDC

  • Reads database Binlog events.
  • Internally uses Debezium.
  • Behaves like a MySQL replica that continuously receives row-level changes.
  1. Different Data Structures

JDBC Batch

Returns ordinary row data.

MySQL CDC

Produces structured change events containing:

  • Changed row values
  • Operation types (op)
  • Metadata
  • Before image
  • After image

Examples:

+I  Insert
-U  Before Update
+U  After Update
-D  Delete
Enter fullscreen mode Exit fullscreen mode

A traditional SQL query cannot generate this event structure.

4. How to Implement Filtering and Transformation in CDC Mode

Although filtering cannot be performed in the Source, several alternatives are available.

Method Location Description
Transform SeaTunnel Transform Section Recommended approach. Supports filtering, projection, renaming, and SQL processing.
Sink Processing Sink Configuration Some sinks support limited filtering or mapping capabilities.
Schema Evolution Table Synchronization Layer Sync selected columns to achieve indirect column filtering; requires advance schema changes and is typically used for long-term sync scenarios.

5. CDC Can Monitor Multiple Tables Simultaneously

When multiple tables need to be synchronized using a single CDC job, SeaTunnel provides a simple configuration model.

The key idea is:

  • One Source β†’ Many Tables
  • Automatic Sink Routing

Configuring Multi-Table CDC

Simply list all tables inside:

table-names
Enter fullscreen mode Exit fullscreen mode

Example:

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://192.168.1.107:51382/cs1"
    username = "root"
    password = "zysoft"

    database-names = ["cs1"]

    table-names = [
      "cs1.t_8_100w",
      "cs1.order_table",
      "cs1.user_profile"
    ]

    startup.mode = "initial"
    server-id = 5400
    server-time-zone = "Asia/Shanghai"
  }
}
Enter fullscreen mode Exit fullscreen mode

Sink Configuration: Automatic Routing

SeaTunnel JDBC Sink supports automatic table routing.

  1. Core Configuration
sink {
  jdbc {

    url = "jdbc:mysql://192.168.1.107:51382/cs2"

    driver = "com.mysql.cj.jdbc.Driver"

    user = "root"

    password = "zysoft"

    generate_sink_sql = true

    database = "cs2"    # Target database

    # Key technique: dynamically map source table names using built-in variables
    table = "${table_name}"

    # table = "prefix_${table_name}"                  # Add a prefix if needed

    # table = "${database_name}_${table_name}_suffix" # Use database name and suffix

    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"

    data_save_mode = "APPEND_DATA"

    batch_size = 5000

    # ... Other connection and performance tuning parameters remain unchanged

  }
}
Enter fullscreen mode Exit fullscreen mode

Built-In Variables

SeaTunnel automatically provides:

${table_name}
${database_name}
Enter fullscreen mode Exit fullscreen mode

During execution:

cs1.t_8_100w
   ->
cs2.t_8_100w

cs1.order_table
   ->
cs2.order_table
Enter fullscreen mode Exit fullscreen mode

This creates a natural 1:1 table mapping.

  1. Different Routing Strategies for Different Tables

If different tables require different write policies, configure multiple Sink definitions.

Example:

sink {

  # Sink 1: Dedicated to processing the t_8_100w table
  jdbc {

    # Explicitly define the target table
    table = "t_8_100w_target"

    # Route only data from the specified source table to this sink
    source_table_name = "t_8_100w"

    # ... Other configurations

  }
}

sink {

  # Sink 2: Dedicated to processing the order_table table
  # Different write strategies (such as data_save_mode) can be applied
  jdbc {

    table = "order_table_target"

    source_table_name = "order_table"

    # Example: Use a truncate-and-reload strategy for this table
    data_save_mode = "DROP_DATA"

    # ... Other configurations

  }
}
Enter fullscreen mode Exit fullscreen mode
  1. Reminding and Best Practices

    1. Target Tables Ensure compatible target tables already exist, or enableschema_save_mode = β€œCREATE_SCHEMA_WHEN_NOT_EXIST
    2. Performance Isolation All monitored tables share the same CDC stream. If one table generates massive changes, it may increase latency for others. For critical workloads, create separate CDC jobs.
    3. Initial Snapshot When using startup.mode = "initial", all monitored tables are snapshotted during startup. Ensure sufficient database resources are available.

Summary:

For most multi-table CDC scenarios:

  1. List multiple tables in table-names
  2. Configure:
table = "${table_name}"
Enter fullscreen mode Exit fullscreen mode

in the Sink

SeaTunnel will automatically route data to matching target tables.

If special processing is required for specific tables, use:

  • Transform SQL
  • Filters
  • Conditional routing

to customize behavior.

  1. Routing and Filters If you want data to flow selectively into different sinks instead of being copied to every sink, you must configure a filter before each sink or use more advanced mechanisms such as side outputs (typically implemented through conditional expressions in the configuration file).

πŸ’‘ Configuration Recommendations for Multi-Table CDC Scenarios

Based on the questions discussed earlier, a typical configuration for synchronizing multiple CDC tables into different target tables is shown below:

source {
  MySQL-CDC {
    table-names = ["cs1.t_order", "cs1.t_user", "cs1.t_log"]
    # ... other configurations
  }
}

sink {

  # Order table -> Order Archive Database
  jdbc {
    table = "t_order_archive"

    # Use filter to synchronize only t_order data
    filter {
      source_table_name == "t_order"
    }

    # ...
  }

  # User table -> User Analytics Database
  jdbc {
    table = "t_user_analysis"

    filter {
      source_table_name == "t_user"
    }

    data_save_mode = "OVERWRITE"   # Use overwrite strategy for this table

    # ...
  }

  # Log table -> Log Center (example writes to HDFS)
  hdfs {
    path = "/data/lake/log/${source_table_name}/dt=${now(date='yyyy-MM-dd')}"

    filter {
      source_table_name == "t_log"
    }

    # ...
  }
}
Enter fullscreen mode Exit fullscreen mode

Steps to Implement MySQL CDC

1. Verify That MySQL Binlog Is Enabled

Use the following commands to check whether Binlog is enabled.

-- The simplest way to check
SHOW VARIABLES LIKE 'log_bin';

-- Shows current binlog file name and location
SHOW MASTER STATUS;

-- Verify binlog format
SHOW VARIABLES LIKE 'binlog_format';

-- Verify row image mode
SHOW VARIABLES LIKE 'binlog_row_image';
Enter fullscreen mode Exit fullscreen mode

Enable Binlog If It Is Disabled

If Binlog is not enabled, modify the MySQL configuration file and restart MySQL.

[mysqld]

server-id = 123
# Configure a unique server ID

log_bin = /var/lib/mysql/mysql-bin
# Enable Binlog and specify storage path

binlog_format = ROW
# Must be configured as ROW mode

binlog_row_image = FULL
# Must be configured as FULL

expire_logs_days = 10
# Binlog retention period
# At least 2 days is recommended
Enter fullscreen mode Exit fullscreen mode

2. Common MySQL CDC Parameters

Single-Table MySQL CDC Demo

  • Create Target Table
-- demo7-1-mysql-cdc2mysql-qxzh-st-107.conf

CREATE TABLE cs2.t_8_100w_imp_st_qxzh_cdc_demo7_1 (

  id BIGINT NOT NULL COMMENT 'Primary Key',

  user_name VARCHAR(2000) NULL COMMENT 'User Name',

  sex VARCHAR(20) NULL COMMENT 'Gender: Male/Female',

  decimal_f DECIMAL(32,6) NULL COMMENT 'Large Decimal Value',

  phone_number VARCHAR(20) COMMENT 'Phone Number',

  age INT NULL COMMENT 'Converted Age',

  create_time TIMESTAMP COMMENT 'Creation Time',

  description LONGTEXT NULL COMMENT 'Large Text',

  address VARCHAR(2000) NULL COMMENT 'Default Value for Empty Address: Unknown',

  PRIMARY KEY (id)

);
Enter fullscreen mode Exit fullscreen mode
  • Execute the Job
# demo7-1-mysql-cdc2mysql-qxzh-st-107.conf

sh /data/tools/seatunnel/seatunnel-2.3.12/bin/seatunnel.sh \
--config /data/tools/seatunnel/myconf/demo7-1-mysql-cdc2mysql-qxzh-st-107.conf \
-m local
Enter fullscreen mode Exit fullscreen mode
  • Complete Configuration File
# demo7-1-mysql-cdc2mysql-qxzh-st-107.conf

env {

  # Parallelism (number of threads)
  execution.parallelism = 5

  # Job mode:
  # BATCH = Batch Processing
  # STREAMING = Streaming Processing (required for CDC)
  job.mode = "STREAMING"
}

source {

  MySQL-CDC {

    base-url = "jdbc:mysql://ip:port/cs1"

    username = "root"

    password = "zysoft"

    # query is invalid in CDC mode

    # Source database
    database-names = ["cs1"]

    # Monitored tables
    # Table names must include database names
    table-names = ["cs1.t_8_100w"]

    # Startup mode:
    # initial = full snapshot + incremental changes
    # latest  = incremental changes only
    startup.mode = initial

    # Startup timestamp
    # Required when startup.mode=timestamp
    # startup.timestamp

    # Very important!
    # CDC client unique ID
    # Example: 5400 or range 5400-6408
    # Must not conflict with existing MySQL server IDs
    server-id = 5400

    # Stop mode
    # stop.mode

    # Required when stop.mode=timestamp
    # stop.timestamp

    # Database session time zone
    server-time-zone = "Asia/Shanghai"
  }
}

# CDC transformations must be implemented in Transform

transform {

  # 1. Field Mapping
  # Alternatively, use the FieldMapper plugin

  FieldMapper {

    field_mapper = {

      id = id

      name = user_name

      sex = sex

      decimal_f = decimal_f

      phone_number = phone_number

      age = age

      create_time = create_time

      description = description
    }
  }

  # 2. Phone number masking
  # 13812341234 -> 138****1234

  # 3. Age conversion
  # String -> Integer

  # 4. Gender conversion
  # 1 -> Male
  # 2 -> Female

  # 5. Data filtering
  # Keep only records where age > 25

  # 6. Address default value
  # Empty address -> "Unknown"
}

sink {

  jdbc {

    url = "jdbc:mysql://ip:port/cs2"

    driver = "com.mysql.cj.jdbc.Driver"

    user = "root"

    password = "zysoft"

    # Automatically generate insert SQL
    # Can also create tables automatically
    generate_sink_sql = true

    # Required when generate_sink_sql=true
    database = cs2

    table = "t_8_100w_imp_st_qxzh_cdc_demo7_1"

    # Fail if schema does not exist
    # Commonly used:
    # CREATE_SCHEMA_WHEN_NOT_EXIST
    schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"

    # APPEND_DATA
    # Keep existing data and append new records
    data_save_mode = "APPEND_DATA"

    # DROP_DATA
    # Clear table before loading
    # data_save_mode = "DROP_DATA"

    batch_size = 5000

    # Retry attempts
    max_retries = 3

    # Connection timeout
    connection_check_timeout_sec = 300

    properties = {

      useUnicode = true

      characterEncoding = "utf8"

      serverTimezone = "Asia/Shanghai"

      # Enable batch rewrite
      rewriteBatchedStatements = "true"

      # Enable compression
      useCompression = "true"

      # Disable server-side prepared statements
      useServerPrepStmts = "false"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode
  • Execution Result

After startup, the CDC job first performs the initial snapshot synchronization.

In this example:

  • Total records read: 1,000,009
  • Total records written: 1,000,009
  • Job status: RUNNING
  • Checkpoints continue to be generated successfully.

This indicates that:

  1. The initial full snapshot has completed successfully.
  2. Source and target data are consistent.
  3. Checkpointing is functioning correctly.
  4. The CDC job has entered continuous Binlog monitoring mode.
  5. The streaming pipeline remains active and ready to capture new changes in real time.

Result Demonstration

Once the initial synchronization is complete:

  • INSERT operations are captured immediately.
  • UPDATE operations are synchronized in real time.
  • DELETE operations are propagated automatically.
  • The CDC job continues running until manually stopped.

At this point, the pipeline behaves like a long-running subscription service, continuously consuming MySQL Binlog events and delivering them to downstream systems in real time.

εŠ¨ε›Ύ

Top comments (0)