⭐ Star Apache SeaTunnel on GitHub:
https://github.com/apache/seatunnel
Overview
In many real-world scenarios, multiple databases may contain tables with similar data. For example, data from different business systems may be stored in tables with the same schema but located in separate databases.
When these tables need to be consolidated into a single table for reporting and analytics, a common challenge arises: because the source tables share the same primary key design, directly merging the data results in duplicate primary keys.
Apache SeaTunnel provides an elegant solution to this challenge. In this article, we'll walk through how Apache SeaTunnel resolves this issue in a simple and effective way.
Solution Design
This solution is designed to synchronize the test table from two independent MySQL databases (source1 and source2) into the test table of a third database (source3) in real time and with high accuracy.
To support real-time synchronization and capture data changes, the solution uses Change Data Capture (CDC) based on the MySQL Binary Log (Binlog).
To prevent duplicate auto-increment primary key IDs from different source tables, the destination table introduces an additional sources column. This column, together with the original id, forms a composite primary key, ensuring both data uniqueness and traceability.
Prerequisites
MySQL 5.7
Apache SeaTunnel 2.3.12
Verify the MySQL Configuration
First, verify that MySQL Binary Logging (Binlog) is enabled.
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+-------+
| Variable_name | Value |
+--------------------------+-------+
| binlog_format | ROW |
| binlog_row_image | FULL |
| enforce_gtid_consistency | OFF |
| gtid_mode | OFF |
| log_bin | ON |
+--------------------------+-------+
If the value of log_bin is not ON, update the MySQL configuration file (mysql.cnf) as follows:
log-bin=mysql-bin
server-id=1
binlog_format=ROW
binlog_checksum=NONE
Restart the MySQL service after updating the configuration.
Configure Apache SeaTunnel
1. Install the Required Connectors
Add the following connectors to the ${SEATUNNEL_HOME}/config/plugin_config file:
connector-cdc-mysql
connector-jdbc
Then install the connectors.
sh bin/install-plugin.sh
2. Install the MySQL JDBC Driver
This example uses mysql-connector-java-8.0.28.jar.
Copy the JAR file to the ${SEATUNNEL_HOME}/lib/ directory.
Prepare the Test Data
CREATE DATABASE source1 CHARACTER SET utf8mb4;
CREATE DATABASE source2 CHARACTER SET utf8mb4;
CREATE DATABASE source3 CHARACTER SET utf8mb4;
USE source1;
CREATE TABLE `test` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(50) CHARACTER SET utf8mb4,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 CHARACTER SET=utf8mb4;
INSERT INTO test VALUES (1,'张三');
INSERT INTO test VALUES (2,'李四');
USE source2;
CREATE TABLE `test` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(50) CHARACTER SET utf8mb4,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 CHARACTER SET=utf8mb4;
INSERT INTO test VALUES (1,'王五');
INSERT INTO test VALUES (2,'赵六');
USE source3;
CREATE TABLE `test` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(50) CHARACTER SET utf8mb4,
`sources` VARCHAR(50) CHARACTER SET utf8mb4,
PRIMARY KEY (`id`, `sources`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 CHARACTER SET=utf8mb4;
Prepare the SeaTunnel Job
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 20000
}
source {
MySQL-CDC {
plugin_output = "source_data1"
url = "jdbc:mysql://10.0.12.100:3306/source1"
username = "root"
password = "root"
table-names = ["source1.test"]
startup.mode = initial
}
MySQL-CDC {
plugin_output = "source_data2"
url = "jdbc:mysql://10.0.12.100:3306/source2"
username = "root"
password = "root"
table-names = ["source2.test"]
startup.mode = initial
}
}
transform {
Sql {
plugin_input = "source_data1"
plugin_output = "result1"
query = "SELECT *, 'source1' AS sources FROM source_data1"
}
Sql {
plugin_input = "source_data2"
plugin_output = "result2"
query = "SELECT *, 'source2' AS sources FROM source_data2"
}
}
sink {
Jdbc {
plugin_input = ["result1","result2"]
url = "jdbc:mysql://10.0.12.100:3306/source3"
driver = "com.mysql.cj.jdbc.Driver"
username = "root"
password = "root"
database = "source3"
table = "test"
generate_sink_sql = true
primary_keys = ["id","sources"]
}
}
This example uses a single configuration to synchronize data from two databases. Alternatively, you can configure two separate synchronization jobs.
Two source connectors are configured: source_data1 and source_data2.
Two SQL transform components are also configured to process the corresponding source connectors.
Each SQL transform assigns a fixed value to the sources field.
The sink consumes the outputs from both SQL transforms and writes the data to the test table. The composite primary key is configured to support update operations.
The destination table must be created manually. Otherwise, the following error will be reported:
BLOB/TEXT column 'sources' used in key specification without a key length
Test
1. Start the Job
bin/seatunnel.sh --config job/mysql_mysql.conf -m local
2. Verify the Data
mysql> select * from test;
+----+--------+---------+
| id | name | sources |
+----+--------+---------+
| 1 | 张三 | source1 |
| 1 | 王五 | source2 |
| 2 | 李四 | source1 |
| 2 | 赵六 | source2 |
+----+--------+---------+
3. Modify the Source Data
USE source1;
INSERT INTO test VALUES (3,'钱七');
UPDATE test
SET name='张三1'
WHERE id=1;
USE source2;
DELETE FROM test
WHERE id=1;
4. Verify the Data Again
mysql> SELECT * FROM test;
+----+---------+---------+
| id | name | sources |
+----+---------+---------+
| 1 | 张三1 | source1 |
| 2 | 李四 | source1 |
| 2 | 赵六 | source2 |
| 3 | 钱七 | source1 |
+----+---------+---------+
Summary
The steps above demonstrate the complete process of synchronizing data from tables in two databases into a single table in another database.
I have seen other articles mentioning that the destination table can be created automatically, but I have not been able to reproduce this behavior during my testing.
If you have successfully implemented automatic destination table creation, feel free to share your experience in the comments.

Top comments (0)