DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

Data Migration from MySQL to OceanBase Using Apache SeaTunnel

In this guide, we will walk you through a lightweight data migration and synchronization solution from MySQL to OceanBase using*Apache SeaTunnel(SeaTunnel for short). We will leverage its built-inZeta engine, which supportsfull data synchronization, offline incremental synchronization, and Change Data Capture (CDC) solutions*.

Preparing the Runtime Environment

Before we begin, ensure that your environment is ready.

Install Java

SeaTunnel requires*Java 8 or higher*. While Java 8 is recommended, later versions should work as well.

After installation, verify that Java is correctly configured by running:

root:~\# java -version
openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing)
Enter fullscreen mode Exit fullscreen mode

Make sure thatJAVA_HOMEis properly set.

Download and Install Apache SeaTunnel

Visit theofficial SeaTunnel websiteto download the latest version.

For this guide, we will use*version 2.3.9*:

\# Download
wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz
\# Extract
tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz
Enter fullscreen mode Exit fullscreen mode

Installing Connector Plugins

SeaTunnel’s installation package only contains the core framework and the*Zeta engine. To connect with various data sources, you need tomanually download and configure*the required plugins.

Automatic Plugin Installation

To automatically download the necessary connectors, modify theconfig/plugin_configfile and specify the required connectors. By default, the file includes all connectors, but for this guide, we only include the essential ones:

connector-cdc-mysql
connector-jdbc
connector-fake
connector-console
Enter fullscreen mode Exit fullscreen mode

Run the following command to install the plugins:

sh bin/install-plugin.sh 2.3.9
Enter fullscreen mode Exit fullscreen mode

Manual Plugin Installation

Alternatively, you can manually download the required plugins fromApache Maven Repository.

Download the necessary.jarfiles, for example:

connector-cdc-mysql-2.3.9.jar
connector-console-2.3.9.jar
connector-fake-2.3.9.jar
connector-jdbc-2.3.9.jar
seatunnel-transforms-v2-2.3.9.jar
Enter fullscreen mode Exit fullscreen mode

After downloading,move the files into the**Connectors**directory.

Verifying Connector Installation

To check if the connectors are installed correctly, run:

./bin/seatunnel-connector.sh -l

Source
FakeSource MySQL-CDC Jdbc


Sink
Jdbc Console


Transform
Copy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql
Enter fullscreen mode Exit fullscreen mode

Since we will be using*JDBC for MySQL connectionto interact withOceanBase, you also need to download theMySQL JDBC driver*from theofficial MySQL website.

Once downloaded, place themysql-connector-j-9.0.0.jarfile into {seatunnel/lib}

Verifying SeaTunnel Installation

To confirm that SeaTunnel is installed correctly, execute a*batch processing test*using the default configuration template:

./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local
Enter fullscreen mode Exit fullscreen mode

Command Explanation

  • seatunnel.sh→ Standard SeaTunnel startup script
  • config→ Specifies the configuration script
  • -m local→ Runs in local mode

If everything is working correctly, you should see output similar to this:

2022-12-19 11:01:45,417 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1:  SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438
Enter fullscreen mode Exit fullscreen mode

At the end of the job execution, you will see a*summary log*:

**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
           Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time                : 2024-08-29 22:45:29
End Time                  : 2024-08-29 22:45:33
Total Time(s)             :                   4
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
Enter fullscreen mode Exit fullscreen mode

This confirms that SeaTunnel is working correctly.

Full Data Synchronization

Creating the Test Table

To verify*full data synchronization, we create test tables in bothMySQLandOceanBase*.

Step 1: Creating the MySQL Table

CREATE TABLE \`table1\` (
    \`id\` INT NOT NULL AUTO_INCREMENT,
    \`value1\` VARCHAR(255) NOT NULL,
    \`value2\` VARCHAR(255) ,
    \`value3\` VARCHAR(255) ,
    \`value4\` VARCHAR(255) ,
    \`value5\` VARCHAR(255) ,
    \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
    \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (\`id\`),
    UNIQUE INDEX \`idx_value1\` (\`value1\`),
    INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
    INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE \`table2\` (
    \`id\` INT NOT NULL AUTO_INCREMENT,
    \`value1\` VARCHAR(255) NOT NULL,
    \`value2\` VARCHAR(255) ,
    \`value3\` VARCHAR(255) ,
    \`value4\` VARCHAR(255) ,
    \`value5\` VARCHAR(255) ,
    \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
    \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (\`id\`),
    UNIQUE INDEX \`idx_value1\` (\`value1\`),
    INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
    INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

We used Navicat to create 100,000 records each.

# Configuring SeaTunnel for Full Synchronization

# Full Data Synchronization Configuration File

_Note: We recommend manually migrating the table schema since automatic migration may encounter issues and does not create indexes._

# Single-Table Full Sync

env {
  parallelism = 5
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        query = "select * from seatunnel.table1"
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        \# Automatically generate SQL statements
        generate\_sink\_sql = true
        database = seatunnel
        table = table1
    }
}
Enter fullscreen mode Exit fullscreen mode

Result:

**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
           Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time                : 2024-08-30 15:05:39
End Time                  : 2024-08-30 15:05:47
Total Time(s)             :                   8
Total Read Count          :              100000
Total Write Count         :              100000
Total Failed Count        :                   0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
Enter fullscreen mode Exit fullscreen mode

Multi-Table Full Extraction

env {
  parallelism = 5
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        table_list = \[
          {
            table_path = "seatunnel.table1"
          },
          {
            table_path = "seatunnel.table2"
            query = "select * from seatunnel.table2 where id > 100"
          }
        \]
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        \# Automatically generate SQL statements
        generate\_sink\_sql = true
        database = seatunnel
        table_list = \["seatunnel.table1", "seatunnel.table2"\]
    }
}
Enter fullscreen mode Exit fullscreen mode

Result:

**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
           Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time                : 2024-08-30 15:10:09
End Time                  : 2024-08-30 15:10:20
Total Time(s)             :                  10
Total Read Count          :              200000
Total Write Count         :              200000
Total Failed Count        :                   0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
Enter fullscreen mode Exit fullscreen mode

Incremental Synchronization Configuration File

For incremental sync, a simple approach is to use a query that filters based on anidorupdatetimecolumn.

env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' "
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        generate\_sink\_sql = true
        database = seatunnel
        table = table1 
    }
}
Enter fullscreen mode Exit fullscreen mode

_Note:_The sink will perform insert and update operations based on the primary key. However, manually updating the configuration file for each incremental run can be cumbersome. We recommend using Apache DolphinScheduler in conjunction with SeaTunnel to create a workflow. With DolphinScheduler, you can obtain the maximum timestamp oridfrom the sink and pass it as a workflow variable.

Example configuration with a variable:

env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} "
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        generate\_sink\_sql = true
        database = seatunnel
        table = table1 
    }
}
Enter fullscreen mode Exit fullscreen mode

The multi-table configuration is similar.

CDC Synchronization Configuration File

Manual Table Schema Migration

Due to issues with SeaTunnel’s OceanBase component, schema migration can be error-prone. It is recommended to migrate the table schema manually.

Check MySQL Binlog Status

Grant the necessary privileges to the user:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;
Enter fullscreen mode Exit fullscreen mode

Verify that the binlog is enabled:

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog\_row\_image', 'gtid_mode', 'enforce\_gtid\_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog\_row\_image         | FULL           |
| enforce\_gtid\_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
Enter fullscreen mode Exit fullscreen mode

If the settings are not as above, please adjust yourmysql.cnffile accordingly. Note that when creating a consistent snapshot on large databases, read timeouts may occur; please configureinteractive_timeoutandwait_timeoutas needed.

After preparing the environment, write the configuration file.

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/mysql"
    username = "xxx"
    password = "xxx@xxx"
    table-names = \["seatunnel.table1", "seatunnel.table2"\]
    startup.mode = "initial"
  }
}

sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        database = "seatunnel"  \# Target database
        table-names = \["seatunnel.table1", "seatunnel.table2"\]
        generate\_sink\_sql = true     \# Automatically generate SQL
    }
}
Enter fullscreen mode Exit fullscreen mode

Once started, the job will first perform a historical data migration, then process CDC changes.

Important Note:

Upon startup, SeaTunnel will execute different operations based on the configured tables and thestartup.modesetting. Thestartup.modeoptions are as follows:

  • **initial**: Synchronizes historical data first, then incremental data.
  • **earliest**: Starts from the earliest offset.
  • **latest**: Starts from the latest offset.
  • **specific**: Starts from a user-provided specific offset.

If you usespecific, you must provide the offset file (e.g.,startup.specific-offset.file binlog) and the offset position (e.g.,startup.specific-offset.pos binlog).

Conclusion

This article has detailed how to configure full, incremental, and CDC synchronization using Apache SeaTunnel. We covered:

  • Full sync configuration for single and multi-table extraction.
  • Incremental sync configuration using query filters (with an option to integrate with Apache DolphinScheduler).
  • CDC sync configuration, including prerequisites like binlog verification.

By following these steps, you can achieve a complete, end-to-end data migration and synchronization solution. Thank you for reading, and please provide your feedback!🚀

Heroku

Deploy with ease. Manage efficiently. Scale faster.

Leave the infrastructure headaches to us, while you focus on pushing boundaries, realizing your vision, and making a lasting impression on your users.

Get Started

Top comments (0)

AWS Q Developer image

Your AI Code Assistant

Automate your code reviews. Catch bugs before your coworkers. Fix security issues in your code. Built to handle large projects, Amazon Q Developer works alongside you from idea to production code.

Get started free in your IDE

👋 Kindness is contagious

Engage with a wealth of insights in this thoughtful article, valued within the supportive DEV Community. Coders of every background are welcome to join in and add to our collective wisdom.

A sincere "thank you" often brightens someone’s day. Share your gratitude in the comments below!

On DEV, the act of sharing knowledge eases our journey and fortifies our community ties. Found value in this? A quick thank you to the author can make a significant impact.

Okay