DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

How Apache SeaTunnel Eliminates Primary Key Conflicts When Consolidating Data from Multiple Tables

⭐ Star Apache SeaTunnel on GitHub:
https://github.com/apache/seatunnel

Overview

In many real-world scenarios, multiple databases may contain tables with similar data. For example, data from different business systems may be stored in tables with the same schema but located in separate databases.

When these tables need to be consolidated into a single table for reporting and analytics, a common challenge arises: because the source tables share the same primary key design, directly merging the data results in duplicate primary keys.

Apache SeaTunnel provides an elegant solution to this challenge. In this article, we'll walk through how Apache SeaTunnel resolves this issue in a simple and effective way.

Solution Design

This solution is designed to synchronize the test table from two independent MySQL databases (source1 and source2) into the test table of a third database (source3) in real time and with high accuracy.

To support real-time synchronization and capture data changes, the solution uses Change Data Capture (CDC) based on the MySQL Binary Log (Binlog).

To prevent duplicate auto-increment primary key IDs from different source tables, the destination table introduces an additional sources column. This column, together with the original id, forms a composite primary key, ensuring both data uniqueness and traceability.

Prerequisites

  1. MySQL 5.7

  2. Apache SeaTunnel 2.3.12

Verify the MySQL Configuration

First, verify that MySQL Binary Logging (Binlog) is enabled.

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');

+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| binlog_format            | ROW   |
| binlog_row_image         | FULL  |
| enforce_gtid_consistency | OFF   |
| gtid_mode                | OFF   |
| log_bin                  | ON    |
+--------------------------+-------+
Enter fullscreen mode Exit fullscreen mode

If the value of log_bin is not ON, update the MySQL configuration file (mysql.cnf) as follows:

log-bin=mysql-bin
server-id=1
binlog_format=ROW
binlog_checksum=NONE
Enter fullscreen mode Exit fullscreen mode

Restart the MySQL service after updating the configuration.

Configure Apache SeaTunnel

1. Install the Required Connectors

Add the following connectors to the ${SEATUNNEL_HOME}/config/plugin_config file:

connector-cdc-mysql
connector-jdbc
Enter fullscreen mode Exit fullscreen mode

Then install the connectors.

sh bin/install-plugin.sh
Enter fullscreen mode Exit fullscreen mode

2. Install the MySQL JDBC Driver

This example uses mysql-connector-java-8.0.28.jar.

Copy the JAR file to the ${SEATUNNEL_HOME}/lib/ directory.

Prepare the Test Data

CREATE DATABASE source1 CHARACTER SET utf8mb4;
CREATE DATABASE source2 CHARACTER SET utf8mb4;
CREATE DATABASE source3 CHARACTER SET utf8mb4;

USE source1;

CREATE TABLE `test` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(50) CHARACTER SET utf8mb4,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 CHARACTER SET=utf8mb4;

INSERT INTO test VALUES (1,'张三');
INSERT INTO test VALUES (2,'李四');

USE source2;

CREATE TABLE `test` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(50) CHARACTER SET utf8mb4,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 CHARACTER SET=utf8mb4;

INSERT INTO test VALUES (1,'王五');
INSERT INTO test VALUES (2,'赵六');

USE source3;

CREATE TABLE `test` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(50) CHARACTER SET utf8mb4,
  `sources` VARCHAR(50) CHARACTER SET utf8mb4,
  PRIMARY KEY (`id`, `sources`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 CHARACTER SET=utf8mb4;
Enter fullscreen mode Exit fullscreen mode

Prepare the SeaTunnel Job

env {
  parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 20000
}

source {

  MySQL-CDC {
    plugin_output = "source_data1"
    url = "jdbc:mysql://10.0.12.100:3306/source1"
    username = "root"
    password = "root"
    table-names = ["source1.test"]
    startup.mode = initial
  }

  MySQL-CDC {
    plugin_output = "source_data2"
    url = "jdbc:mysql://10.0.12.100:3306/source2"
    username = "root"
    password = "root"
    table-names = ["source2.test"]
    startup.mode = initial
  }

}

transform {

  Sql {
    plugin_input = "source_data1"
    plugin_output = "result1"
    query = "SELECT *, 'source1' AS sources FROM source_data1"
  }

  Sql {
    plugin_input = "source_data2"
    plugin_output = "result2"
    query = "SELECT *, 'source2' AS sources FROM source_data2"
  }

}

sink {

  Jdbc {

    plugin_input = ["result1","result2"]

    url = "jdbc:mysql://10.0.12.100:3306/source3"

    driver = "com.mysql.cj.jdbc.Driver"

    username = "root"

    password = "root"

    database = "source3"

    table = "test"

    generate_sink_sql = true

    primary_keys = ["id","sources"]

  }

}
Enter fullscreen mode Exit fullscreen mode

This example uses a single configuration to synchronize data from two databases. Alternatively, you can configure two separate synchronization jobs.

Two source connectors are configured: source_data1 and source_data2.

Two SQL transform components are also configured to process the corresponding source connectors.

Each SQL transform assigns a fixed value to the sources field.

The sink consumes the outputs from both SQL transforms and writes the data to the test table. The composite primary key is configured to support update operations.

The destination table must be created manually. Otherwise, the following error will be reported:

BLOB/TEXT column 'sources' used in key specification without a key length
Enter fullscreen mode Exit fullscreen mode

Test

1. Start the Job

bin/seatunnel.sh --config job/mysql_mysql.conf -m local
Enter fullscreen mode Exit fullscreen mode

2. Verify the Data

mysql> select * from test;

+----+--------+---------+
| id | name   | sources |
+----+--------+---------+
|  1 | 张三   | source1 |
|  1 | 王五   | source2 |
|  2 | 李四   | source1 |
|  2 | 赵六   | source2 |
+----+--------+---------+
Enter fullscreen mode Exit fullscreen mode

3. Modify the Source Data

USE source1;

INSERT INTO test VALUES (3,'钱七');

UPDATE test
SET name='张三1'
WHERE id=1;

USE source2;

DELETE FROM test
WHERE id=1;
Enter fullscreen mode Exit fullscreen mode

4. Verify the Data Again

mysql> SELECT * FROM test;

+----+---------+---------+
| id | name    | sources |
+----+---------+---------+
|  1 | 张三1   | source1 |
|  2 | 李四    | source1 |
|  2 | 赵六    | source2 |
|  3 | 钱七    | source1 |
+----+---------+---------+
Enter fullscreen mode Exit fullscreen mode

Summary

The steps above demonstrate the complete process of synchronizing data from tables in two databases into a single table in another database.

I have seen other articles mentioning that the destination table can be created automatically, but I have not been able to reproduce this behavior during my testing.

If you have successfully implemented automatic destination table creation, feel free to share your experience in the comments.

Top comments (0)