DEV Community

Apache Doris
Apache Doris

Posted on

Overview of Real-Time Data Synchronization from MySQL to VeloDB

In the process of migrating data from MySQL (including MySQL-compatible databases such as Amazon Aurora) to VeloDB, Flink can be used as a real-time data synchronization engine to ensure data consistency and real-timeliness. Flink boasts high-throughput and low-latency stream processing capabilities, enabling efficient full data synchronization and incremental change handling for databases.

For real-time synchronization scenarios, MySQL Binlog can be enabled to capture CDC (Change Data Capture) events. Whether it is a traditional self-hosted MySQL or Amazon Aurora-MySQL deployed on the cloud, you can enable Binlog and use Flink CDC for subscription to achieve:

  • Full data initial load: Import existing data from MySQL/Aurora to VeloDB first

  • Real-time synchronization of incremental changes: Capture Insert/Update/Delete operations based on Binlog and continuously write them to VeloDB

The overall link is as follows:

Here, we take Amazon Aurora-MySQL as an example to demonstrate how to use Flink CDC to capture data changes in Aurora and synchronize them to VeloDB in real time.

Example

1. Create an AWS RDS Aurora MySQL instance

2. Create a MySQL database and corresponding tables

CREATE DATABASE test_db;
CREATE TABLE test_db.student (
    id INT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    age INT ,
    email VARCHAR(255) ,
    phone VARCHAR(20) ,
    score DECIMAL(5,2) ,
    created_at TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO test_db.student (id, name, age, email, phone, score, created_at) 
VALUES 
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80, NOW()),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00, NOW()),
(4, 'David Chen', 20, 'david@example.com', '13500135000', 85.60, NOW()),
(5, 'Emma Liu', 22, 'emma@example.com', '13700137000', 78.90, NOW());

Enter fullscreen mode Exit fullscreen mode

3. Create a VeloDB warehouse

4. Modify MySQL configuration

  1. Create a parameter group and add the binlog configuration

  1. Modify binlog_format to ROW

  1. Replace the DB Cluster Parameter with the parameter group created just now, then restart the service after applying the changes

5. Install Flink With Doris Connector

5.1 Download the pre-defined installation package

Based on Flink 1.17, we provide a pre-defined installation package that can be directly downloaded and decompressed.

5.2 Manual installation

If you already have a Flink environment or need another version of Flink, you can use the manual installation method. Taking Flink 1.17 as an example here, download the Flink installation package and its dependencies:

  • Flink 1.17

  • Flink MySQL CDC Connector

  • Flink Doris Connector

  • MySQL Driver

After the download is complete, extract the Flink installation package:

tar -zxvf flink-1.17.2-bin-scala_2.12.tgz

Enter fullscreen mode Exit fullscreen mode

Meanwhile, place the Flink MySQL CDC Connector, Doris Connector, and MySQL driver package into the flink-1.17.2-bin/lib directory. As follows:

6. Submit the Flink synchronization job

When submitting the job, the Doris Connector will automatically create corresponding tables in VeloDB in advance based on the table structure of the upstream MySQL database.

Flink supports job submission and operation in modes such as Local, Standalone, Yarn, and K8S. If you already have a Flink environment, you can directly submit the job to your own Flink environment.

6.1 Local Environment

> cd flink-1.17.2-bin
> bin/flink run -t local \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    mysql-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \                                                                                                                      
    --mysql-conf port=3306 \                                                                                                                               
    --mysql-conf username=admin \                                                                                                                           
    --mysql-conf password=123456 \                                                                                                                         
    --mysql-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

6.2 Standalone Environment

> cd flink-1.17.2-bin
> bin/flink run -t remote \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    mysql-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \                                                                                                                      
    --mysql-conf port=3306 \                                                                                                                               
    --mysql-conf username=admin \                                                                                                                           
    --mysql-conf password=123456 \                                                                                                                         
    --mysql-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

6.3 Yarn Environment

> cd flink-1.17.2-bin
> bin/flink run -t yarn-per-job \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    mysql-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \                                                                                                                      
    --mysql-conf port=3306 \                                                                                                                               
    --mysql-conf username=admin \                                                                                                                           
    --mysql-conf password=123456 \                                                                                                                         
    --mysql-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

6.4 K8S Environment

> cd flink-1.17.2-bin
> bin/flink run -t kubernetes-session \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    mysql-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \                                                                                                                      
    --mysql-conf port=3306 \                                                                                                                               
    --mysql-conf username=admin \                                                                                                                           
    --mysql-conf password=123456 \                                                                                                                         
    --mysql-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

Note: For more parameters of the Connector, refer to this link.

7. Verify Historical Data Synchronization

The Flink job will synchronize full historical data for the first time. Check the data synchronization status in VeloDB.

8. Verify Real-Time Data Synchronization

Perform data modifications in MySQL:

INSERT INTO student (id, name, age, email, phone, score, created_at) 
VALUES 
(6, 'Frank Zhao', 24, 'frank@example.com', '13400134000', 88.75, NOW());

DELETE FROM student WHERE id = 3;

UPDATE student 
SET score = 95.00, age = 23 
WHERE id = 2;

Enter fullscreen mode Exit fullscreen mode

Verify data changes in VeloDB:

Top comments (0)