Synchronizing Data from MySQL to PostgreSQL Using Apache SeaTunnel
Author | Chen Fei, Big Data Engineer at ChinaPay
Today, I’d like to share a simple but common scenario of MySQL-to-MySQL data synchronization and merging. This case reflects a problem I encountered in real work, and I hope it can spark discussion. I welcome more experienced peers to share insights and ideas.
- Version Requirement: Apache SeaTunnel → Apache SeaTunnel-2.3.9
Scenario Description
In our business system, there are two MySQL source databases:
source_a
source_b
Both databases contain a table with the same structure, but data comes from different business lines. Data is generated simultaneously on both sides, which leads to primary key conflicts.
Our goal is to merge the tables from these two sources into a single target database (we call it C
) for unified analysis and querying.
Challenges Faced
- Although the two source tables have the same structure, primary keys may conflict, which must be avoided.
- In the future, there may be differences in fields or new fields added.
- The synchronization process should be as real-time as possible, without generating duplicate data.
Solution
We implemented the synchronization and merging solution as follows:
Create the target table in database C:
- The table structure needs to cover all fields from both source tables (currently identical, but may expand in the future).
- Add an extra field:
data_source
to indicate the data origin (source_a
orsource_b
). - Non-nullable fields must have default values.
Set composite primary key and unique constraint:
- Use original primary key +
data_source
as a composite primary key to ensure no conflict arises from overlapping keys in the two sources.
Use two separate SeaTunnel processes for data synchronization:
- Use MySQL CDC connector to monitor
source_a
andsource_b
respectively. - Add a
data_source
field to each record. - Use JDBC Sink to write data into database C.
Practical Demo
Let’s get straight to the practical part. Since the basics of SeaTunnel have been covered in the previous article, we won’t repeat them here.
Preparations Before Using MySQL CDC
To use the mysql-cdc
connector, there are two prerequisites:
-
The MySQL source must have Binlog enabled:
-
binlog_format
must be set toROW
. -
binlog_row_image
must be set toFULL
. - Check current configuration:
-
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';
-- If not enabled, add the following to my.cnf:
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
binlog-row-image = FULL
Detailed permission descriptions and setup can be found in the official documentation. It is recommended to review it.
- Prepare a user account with replication privileges:
-- Create CDC user
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'your_password';
-- Grant necessary privileges
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;
Preparing SeaTunnel Runtime Package and Plugins
- Option 1: Download official binary package
Suitable when the server can access the internet and no custom plugins are needed.
- Download from the official Releases page.
- Manually add plugins and driver dependencies (e.g.,
mysql-cdc
,jdbc
). - Plugin installation instructions can be found in the official documentation:
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
# Keep needed plugins in config/plugin_config
bin/install-plugin.sh
- Option 2: Clone the source code from GitHub and build manually
Suitable if you need special plugin support or want all plugins included by default.
sh ./mvnw clean install -DskipTests -Dskip.spotless=true
# The generated package:
seatunnel-dist/target/apache-seatunnel-2.3.9-bin.tar.gz
The manually built package already includes all plugins and dependencies; no extra steps are required.
Plugins used in this case:
mysql-cdc
jdbc
Plugin documentation and driver dependencies are also available in the official docs.
Overview of Apache SeaTunnel Deployment Methods
SeaTunnel supports multiple deployment methods:
- Using the built-in engine (Zeta)
- Running as a Spark/Flink job
Three modes for Zeta engine:
Mode | Description | Usage Suggestion |
---|---|---|
Local Mode | Runs as a single local process, not maintainable, suitable for testing | For testing |
Mixed Mode | Master and Worker are in the same process, prone to resource competition | Not recommended |
Separate Mode | Master and Worker are deployed separately, with independent resources and high stability | Recommended |
Configuration File Structure
Once the cluster is set up, we prepare configuration files.
Generally, SeaTunnel configuration files can be divided into four parts:
- Env: Engine-related configuration
- Source: Source data reading configuration
- Transform: Data transformation (optional)
- Sink: Target database writing configuration
Env (Engine Configuration)
-
parallelism
: Task parallelism. Higher numbers speed up execution, but should match available resources. -
job.mode
: Job execution mode. Since we usemysql-cdc
, it must be set to Streaming. -
checkpoint.interval
: Interval for checkpoints. Streaming mode defaults to 30 seconds, adjustable if needed.
Source Configuration (MySQL CDC)
For mysql-cdc
, configure:
- Connection info: database host, username, password, etc.
- Database/table selection: can specify via
database-names
andtable-names
or use regex for pattern matching. -
startup.mode
: CDC startup mode, default is full snapshot then incremental, suitable for most sync scenarios. -
server-id
: MySQL CDC reader ID. Optional but recommended to avoid conflicts with existing replica IDs. -
MySQL prerequisites: ensure
- Binlog is enabled
-
binlog-format
isROW
-
binlog-row-image
isFULL
- Account has privileges to read binlogs, replication, and query all tables.
Transform Configuration (Optional)
In this case, we add a field to each record indicating its data source, e.g., data_source
with values source_a
or source_b
.
This transformation uses the SQL plugin to add a constant column with the source info.
- Each source table can have its own transformation rules.
-
source_table
is a reserved word indicating the previous processing table.
Sink Configuration
The JDBC Sink is configured with:
- Target database address, driver, username, password, etc.
- Generate insert SQL based on target table structure.
- If fields/structure differ, custom SQL can be used to match the table.
Summary
After combining these configurations, we can synchronize multiple source databases (e.g., source_a
and source_b
) to the target database in real time.
Additionally, the data is labeled with the source and written consistently. This approach supports complex data structures and flexible business scenarios—suitable for real production data integration cases.
Sink Writing Optimization and Validation
We can also optimize Sink write performance:
Batch Writing Strategy
- Batch size and write interval: Data will be written when either condition is met.
Key Configuration Parameters
-
schema_save_mode
: Strategy for saving schema. If it exists, ignore; if not, create automatically from previous output. -
data_save_mode
: Data save strategy; here we use append mode. -
support_upsert_by_query_primary_key_exist
: Enable upsert on primary key conflicts; this is enabled in this case. -
primary_keys
: Specify primary keys for writing; must include the original table key and thedata_source
added in the transform phase.
Submitting Tasks
./seatunnel.sh --config ../config/demo/collect_a.config -e cluster --cluster sz-seatunnel --name collect_a --async
./seatunnel.sh --config ../config/demo/collect_b.config -e cluster --cluster sz-seatunnel --name collect_b --async
# Explanation:
# --config: configuration file
# -e: execution mode (cluster/local)
# --cluster: cluster name
# --name: job name
# --async: run in background
Validation of Actual Run
Now that the configuration is complete, let’s see the results:
-
a
andb
tables contain data;c
is empty. - Run the
a
sync process.
-
Check
c
table: data froma
is written,data_source
=source_a
.- Run the
b
sync process.
- Run the
-
Check
c
table: data fromb
is written,data_source
=source_b
.- Modify some data in
a
.
- Modify some data in
With batch write strategy, wait 2 seconds and check
c
.Corresponding data is updated as expected.
This completes the entire data synchronization and merging process. 🙏
I hope this case provides some inspiration. You’re welcome to share more of your Apache SeaTunnel experiences so we can learn together!
Top comments (0)