Overview
In the process of migrating data from PostgreSQL (including PostgreSQL-compatible Amazon Aurora) to VeloDB, Flink can be introduced as a real-time data synchronization engine to ensure data consistency and real-timeliness. Flink possesses high-throughput and low-latency stream processing capabilities, enabling efficient full-data loading and incremental change handling for databases.
For real-time synchronization scenarios, PostgreSQL's Logical Replication can be enabled to capture CDC (Change Data Capture) events. Whether it is a self-hosted PostgreSQL or cloud-based Amazon Aurora-PostgreSQL, you can subscribe with Flink CDC by enabling the logical decoding plugin and creating a Replication Slot, thereby achieving:
Full data initial load: First import business data from PostgreSQL/Aurora into VeloDB
Real-time synchronization of incremental changes: Capture Insert/Update/Delete operations based on Logical Replication and continuously write them to VeloDB
The following takes Amazon Aurora-PostgreSQL as an example to demonstrate how to use Flink CDC to subscribe to Aurora changes and synchronize them to VeloDB in real time.
Example
1. Create an AWS RDS Aurora PostgreSQL instance
2. Create a VeloDB warehouse
3. Create a PostgreSQL database and corresponding tables
CREATE DATABASE test_db;
-- Create table
CREATE TABLE test_db.public.student (
id INT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
email VARCHAR(255),
phone VARCHAR(20),
score NUMERIC(5,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Load data
INSERT INTO test_db.public.student (id, name, age, email, phone, score, created_at)
VALUES
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80, NOW()),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00, NOW()),
(4, 'David Chen', 20, 'david@example.com', '13500135000', 85.60, NOW()),
(5, 'Emma Liu', 22, 'emma@example.com', '13700137000', 78.90, NOW());
4. Enable PostgreSQL Logical Replication
Create a parameter group and modify the rds.logical_replication configuration
Modify the PostgreSQL configuration: replace the DB Cluster Parameter with the parameter group created just now, apply the changes, and restart the service
5. Install Flink With Doris Connector
5.1 Download the pre-defined installation package
Based on Flink 1.17, we provide a pre-defined installation package that can be directly downloaded and decompressed.
5.2 Manual installation
If you already have a Flink environment or need another version of Flink, you can use the manual installation method.Taking Flink 1.17 as an example here, download the Flink installation package and its dependencies.
Flink 1.17
Flink Postgres CDC Connector
Flink Doris Connector
After the download is complete, extract the Flink installation package.
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz
Meanwhile, place the Flink PostgreSQL CDC Connector and Doris Connector into the flink-1.17.2-bin/lib directory.
As follows:
6. Submit the Flink synchronization job
When submitting the job, the Doris Connector will automatically create corresponding tables in VeloDB in advance based on the table structure of the upstream PostgreSQL database.
Flink supports job submission and operation in modes such as Local, Standalone, and Yarn. If you already have a Flink environment, you can directly submit the job to your own Flink environment.
6.1 Local Environment
cd flink-1.17.2-bin
bin/flink run -t local \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
6.2 Standalone Environment
cd flink-1.17.2-bin
bin/flink run -t remote \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
6.3 Yarn Environment
cd flink-1.17.2-bin
bin/flink run -t yarn-per-job \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
6.4 K8S Environment
cd flink-1.17.2-bin
bin/flink run -t kubernetes-session \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--postgres test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
Note: For more parameters of the Connector, refer to this link.
7. Verify Historical Data Synchronization
The Flink job will synchronize full historical data for the first time. Check the data synchronization status in VeloDB.
8. Verify Real-Time Data Synchronization
For scenarios requiring capture of deleted data, enable the following configuration in PostgreSQL
ALTER TABLE public.student REPLICA IDENTITY FULL
For details, refer to this link.
Perform data modifications in PostgreSQL:
INSERT INTO student (id, name, age, email, phone, score, created_at)
VALUES
(6, 'Frank Zhao', 24, 'frank@example.com', '13400134000', 88.75, NOW());
DELETE FROM student WHERE id = 3;
UPDATE student
SET score = 95.00, age = 23
WHERE id = 2;
Verify data changes in VeloDB:








Top comments (0)