Migrating data from PostgreSQL (or Amazon Aurora-PostgreSQL) to VeloDB while ensuring real-time consistency can be a challenge—luckily, Flink CDC (Change Data Capture) solves this problem with high throughput and low latency. This step-by-step guide will walk you through using Flink CDC to sync data from Aurora-PostgreSQL to VeloDB, covering full data loading and incremental change capture.
Overview
When syncing PostgreSQL/Aurora to VeloDB, Flink acts as the real-time stream processing engine, and PostgreSQL’s Logical Replication captures CDC events. This combination enables:
Full data initial load: Import existing business data from PostgreSQL/Aurora to VeloDB in one go.
Real-time incremental sync: Capture
INSERT/UPDATE/DELETEoperations from PostgreSQL and write them to VeloDB continuously.
We’ll use Amazon Aurora-PostgreSQL as the source and VeloDB as the sink to demonstrate the entire process.
Prerequisites
Before starting, ensure you have:
An AWS RDS Aurora PostgreSQL instance (or self-hosted PostgreSQL).
A VeloDB warehouse (with FE nodes accessible via network).
Flink 1.17+ environment (we’ll cover both pre-built and manual installation).
Network connectivity between Flink, PostgreSQL/Aurora, and VeloDB (e.g., security groups, VPC peering).
Step 1: Set Up Aurora-PostgreSQL & Test Data
1.1 Create an Aurora-PostgreSQL Instance
First, create an AWS RDS Aurora PostgreSQL instance (skip this if you already have one).
1.2 Create a Database and Table
Connect to your Aurora-PostgreSQL instance and run the following SQL to create a test database and table:
CREATE DATABASE test_db;
-- 创建表
CREATE TABLE test_db.public.student (
id INT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
email VARCHAR(255),
phone VARCHAR(20),
score NUMERIC(5,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入数据
INSERT INTO test_db.public.student (id, name, age, email, phone, score, created_at)
VALUES
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80, NOW()),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00, NOW()),
(4, 'David Chen', 20, 'david@example.com', '13500135000', 85.60, NOW()),
(5, 'Emma Liu', 22, 'emma@example.com', '13700137000', 78.90, NOW());
1.3 Enable PostgreSQL Logical Replication
Modify the PostgreSQL configuration: replace the DB Cluster Parameter with the parameter group created just now, apply the changes, and restart the service
Step 2: Install Flink with Doris/VeloDB Connector
VeloDB is compatible with the Flink Doris Connector, so we’ll use that to connect Flink to VeloDB. You can choose either the pre-built package or manual installation.
2.1 Pre-Built Installation (Simplest)
We provide a pre-built Flink 1.17 package with all required connectors (PostgreSQL CDC + Doris/VeloDB). Simply download and extract it:
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz
2.2 Manual Installation (For Existing Flink Environments)
If you already have Flink 1.17 installed, download the required dependencies and add them to the lib directory:
Download Flink 1.17.2: Flink 1.17.2 Download
Download Flink PostgreSQL CDC Connector: Flink Postgres CDC
Download Flink Doris Connector: Flink Doris Connector
Step 3: Submit the Flink CDC Sync Job
The Flink Doris Connector will automatically create corresponding tables in VeloDB based on the PostgreSQL table structure. We’ll cover job submission in 4 common environments: Local, Standalone, Yarn, and Kubernetes.
Important Notes Before Submission
Port Correction: The example uses
port=3306(MySQL port) by mistake—PostgreSQL/Aurora default port is 5432 (update this in the commands below).Database Name Fix: The example has
--postgres-conf database-name=test(mismatch withtest_db)—replace withtest_dbfor consistency.Customize Params: Replace placeholder values (e.g.,
hostname,fenodes,password) with your actual credentials.
3.1 Local Environment
> cd flink-1.17.2-bin
> bin/flink run -t local \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
3.2 Standalone Environment
> cd flink-1.17.2-bin
> bin/flink run -t remote \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
3.3 Yarn Environment (Per-Job Mode)
> cd flink-1.17.2-bin
> bin/flink run -t yarn-per-job \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
3.4 Kubernetes Environment (Session Mode)
> cd flink-1.17.2-bin
> bin/flink run -t kubernetes-session \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--postgres test_db \
--postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \
--postgres-conf port=3306 \
--postgres-conf username=admin \
--postgres-conf password=123456 \
--postgres-conf database-name=test \
--including-tables "student" \
--sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \
--sink-conf username=admin \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \
--sink-conf sink.label-prefix=label
💡 Tip: For more Flink Doris Connector parameters, check the official documentation.
Step 4: Verify Data Sync
4.1 Verify Full Historical Data Sync
The Flink job will first sync all existing data from PostgreSQL to VeloDB. Connect to your VeloDB warehouse and query the student table to confirm the data is present.
4.2 Verify Real-Time Incremental Sync
To capture DELETE operations (required for full incremental sync), first enable full replica identity on the PostgreSQL table:
ALTER TABLE public.student REPLICA IDENTITY FULL
📚 Reference: PostgreSQL Replica Identity Documentation
Now, modify data in PostgreSQL:
INSERT INTO student (id, name, age, email, phone, score, created_at)
VALUES
(6, 'Frank Zhao', 24, 'frank@example.com', '13400134000', 88.75, NOW());
DELETE FROM student WHERE id = 3;
UPDATE student
SET score = 95.00, age = 23
WHERE id = 2;
Check VeloDB to confirm the changes are synced in real-time:
Common Pitfalls to Avoid
PostgreSQL Port Mismatch: Don’t use 3306 (MySQL) for PostgreSQL—use 5432 instead.
Logical Replication Not Enabled: Without
rds.logical_replication = 1, CDC events won’t be captured.Replica Identity Missing: For DELETE operations,
REPLICA IDENTITY FULLis required (otherwise, deletes won’t sync).Network Connectivity: Ensure Flink can reach Aurora-PostgreSQL (5432) and VeloDB (8080, 9030) via security groups/VPC.
Wrapping Up
Flink CDC provides a robust, real-time way to sync data from PostgreSQL/Aurora to VeloDB, covering both full data loads and incremental changes. By following this guide, you can set up a reliable sync pipeline with minimal effort. If you run into issues, check the Flink and VeloDB logs for details, or refer to the official documentation for additional parameters.
Happy syncing! 🚀








Top comments (0)