DEV Community

Apache Doris
Apache Doris

Posted on

Overview of Real-Time Data Synchronization from PostgreSQL to VeloDB

Overview

In the process of migrating data from PostgreSQL (including PostgreSQL-compatible Amazon Aurora) to VeloDB, Flink can be introduced as a real-time data synchronization engine to ensure data consistency and real-timeliness. Flink possesses high-throughput and low-latency stream processing capabilities, enabling efficient full-data loading and incremental change handling for databases.

For real-time synchronization scenarios, PostgreSQL's Logical Replication can be enabled to capture CDC (Change Data Capture) events. Whether it is a self-hosted PostgreSQL or cloud-based Amazon Aurora-PostgreSQL, you can subscribe with Flink CDC by enabling the logical decoding plugin and creating a Replication Slot, thereby achieving:

  • Full data initial load: First import business data from PostgreSQL/Aurora into VeloDB

  • Real-time synchronization of incremental changes: Capture Insert/Update/Delete operations based on Logical Replication and continuously write them to VeloDB

The following takes Amazon Aurora-PostgreSQL as an example to demonstrate how to use Flink CDC to subscribe to Aurora changes and synchronize them to VeloDB in real time.

Example

1. Create an AWS RDS Aurora PostgreSQL instance

2. Create a VeloDB warehouse

3. Create a PostgreSQL database and corresponding tables


CREATE DATABASE test_db;

-- Create table
CREATE TABLE test_db.public.student (
    id INT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    age INT,
    email VARCHAR(255),
    phone VARCHAR(20),
    score NUMERIC(5,2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Load data
INSERT INTO test_db.public.student (id, name, age, email, phone, score, created_at) 
VALUES 
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80, NOW()),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00, NOW()),
(4, 'David Chen', 20, 'david@example.com', '13500135000', 85.60, NOW()),
(5, 'Emma Liu', 22, 'emma@example.com', '13700137000', 78.90, NOW());
Enter fullscreen mode Exit fullscreen mode

4. Enable PostgreSQL Logical Replication

Create a parameter group and modify the rds.logical_replication configuration

Modify the PostgreSQL configuration: replace the DB Cluster Parameter with the parameter group created just now, apply the changes, and restart the service

5. Install Flink With Doris Connector

5.1 Download the pre-defined installation package

Based on Flink 1.17, we provide a pre-defined installation package that can be directly downloaded and decompressed.

5.2 Manual installation

If you already have a Flink environment or need another version of Flink, you can use the manual installation method.Taking Flink 1.17 as an example here, download the Flink installation package and its dependencies.

  • Flink 1.17

  • Flink Postgres CDC Connector

  • Flink Doris Connector

After the download is complete, extract the Flink installation package.


tar -zxvf flink-1.17.2-bin-scala_2.12.tgz
Enter fullscreen mode Exit fullscreen mode

Meanwhile, place the Flink PostgreSQL CDC Connector and Doris Connector into the flink-1.17.2-bin/lib directory.

As follows:

6. Submit the Flink synchronization job

When submitting the job, the Doris Connector will automatically create corresponding tables in VeloDB in advance based on the table structure of the upstream PostgreSQL database.

Flink supports job submission and operation in modes such as Local, Standalone, and Yarn. If you already have a Flink environment, you can directly submit the job to your own Flink environment.

6.1 Local Environment


cd flink-1.17.2-bin
bin/flink run -t local \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label
Enter fullscreen mode Exit fullscreen mode

6.2 Standalone Environment


cd flink-1.17.2-bin
bin/flink run -t remote \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label
Enter fullscreen mode Exit fullscreen mode

6.3 Yarn Environment


cd flink-1.17.2-bin
bin/flink run -t yarn-per-job \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label
Enter fullscreen mode Exit fullscreen mode

6.4 K8S Environment


cd flink-1.17.2-bin
bin/flink run -t kubernetes-session \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --postgres test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label
Enter fullscreen mode Exit fullscreen mode

Note: For more parameters of the Connector, refer to this link.

7. Verify Historical Data Synchronization

The Flink job will synchronize full historical data for the first time. Check the data synchronization status in VeloDB.

8. Verify Real-Time Data Synchronization

For scenarios requiring capture of deleted data, enable the following configuration in PostgreSQL


ALTER TABLE public.student REPLICA IDENTITY FULL
Enter fullscreen mode Exit fullscreen mode

For details, refer to this link.

Perform data modifications in PostgreSQL:


INSERT INTO student (id, name, age, email, phone, score, created_at) 
VALUES 
(6, 'Frank Zhao', 24, 'frank@example.com', '13400134000', 88.75, NOW());

DELETE FROM student WHERE id = 3;

UPDATE student 
SET score = 95.00, age = 23 
WHERE id = 2;
Enter fullscreen mode Exit fullscreen mode

Verify data changes in VeloDB:

Top comments (0)