In the process of migrating data from MySQL (including MySQL-compatible databases such as Amazon Aurora) to VeloDB, Flink can be used as a real-time data synchronization engine to ensure data consistency and real-timeliness. Flink boasts high-throughput and low-latency stream processing capabilities, enabling efficient full data synchronization and incremental change handling for databases.
For real-time synchronization scenarios, MySQL Binlog can be enabled to capture CDC (Change Data Capture) events. Whether it is a traditional self-hosted MySQL or Amazon Aurora-MySQL deployed on the cloud, you can enable Binlog and use Flink CDC for subscription to achieve:
Full data initial load: Import existing data from MySQL/Aurora to VeloDB first
Real-time synchronization of incremental changes: Capture Insert/Update/Delete operations based on Binlog and continuously write them to VeloDB
The overall link is as follows:
Here, we take Amazon Aurora-MySQL as an example to demonstrate how to use Flink CDC to capture data changes in Aurora and synchronize them to VeloDB in real time.
Example
1. Create an AWS RDS Aurora MySQL instance
2. Create a MySQL database and corresponding tables
CREATE DATABASE test_db;
CREATE TABLE test_db.student (
id INT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT ,
email VARCHAR(255) ,
phone VARCHAR(20) ,
score DECIMAL(5,2) ,
created_at TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO test_db.student (id, name, age, email, phone, score, created_at)
VALUES
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80, NOW()),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00, NOW()),
(4, 'David Chen', 20, 'david@example.com', '13500135000', 85.60, NOW()),
(5, 'Emma Liu', 22, 'emma@example.com', '13700137000', 78.90, NOW());
3. Create a VeloDB warehouse
4. Modify MySQL configuration
- Create a parameter group and add the binlog configuration
- Modify
binlog_formattoROW
- Replace the DB Cluster Parameter with the parameter group created just now, then restart the service after applying the changes
5. Install Flink With Doris Connector
5.1 Download the pre-defined installation package
Based on Flink 1.17, we provide a pre-defined installation package that can be directly downloaded and decompressed.
5.2 Manual installation
If you already have a Flink environment or need another version of Flink, you can use the manual installation method. Taking Flink 1.17 as an example here, download the Flink installation package and its dependencies:
Flink 1.17
Flink MySQL CDC Connector
Flink Doris Connector
MySQL Driver
After the download is complete, extract the Flink installation package:
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz
Meanwhile, place the Flink MySQL CDC Connector, Doris Connector, and MySQL driver package into the flink-1.17.2-bin/lib directory. As follows:
6. Submit the Flink synchronization job
When submitting the job, the Doris Connector will automatically create corresponding tables in VeloDB in advance based on the table structure of the upstream MySQL database.
Flink supports job submission and operation in modes such as Local, Standalone, Yarn, and K8S. If you already have a Flink environment, you can directly submit the job to your own Flink environment.
6.1 Local Environment
> cd flink-1.17.2-bin
> bin/flink run -t local \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
6.2 Standalone Environment
> cd flink-1.17.2-bin
> bin/flink run -t remote \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
6.3 Yarn Environment
> cd flink-1.17.2-bin
> bin/flink run -t yarn-per-job \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
6.4 K8S Environment
> cd flink-1.17.2-bin
> bin/flink run -t kubernetes-session \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
Note: For more parameters of the Connector, refer to this link.
7. Verify Historical Data Synchronization
The Flink job will synchronize full historical data for the first time. Check the data synchronization status in VeloDB.
8. Verify Real-Time Data Synchronization
Perform data modifications in MySQL:
INSERT INTO student (id, name, age, email, phone, score, created_at)
VALUES
(6, 'Frank Zhao', 24, 'frank@example.com', '13400134000', 88.75, NOW());
DELETE FROM student WHERE id = 3;
UPDATE student
SET score = 95.00, age = 23
WHERE id = 2;
Verify data changes in VeloDB:









Top comments (0)