DEV Community

Apache Doris
Apache Doris

Posted on

Overview of Real-Time Data Synchronization from PostgreSQL to VeloDB

Migrating data from PostgreSQL (or Amazon Aurora-PostgreSQL) to VeloDB while ensuring real-time consistency can be a challenge—luckily, Flink CDC (Change Data Capture) solves this problem with high throughput and low latency. This step-by-step guide will walk you through using Flink CDC to sync data from Aurora-PostgreSQL to VeloDB, covering full data loading and incremental change capture.

Overview

When syncing PostgreSQL/Aurora to VeloDB, Flink acts as the real-time stream processing engine, and PostgreSQL’s Logical Replication captures CDC events. This combination enables:

  • Full data initial load: Import existing business data from PostgreSQL/Aurora to VeloDB in one go.

  • Real-time incremental sync: Capture INSERT/UPDATE/DELETE operations from PostgreSQL and write them to VeloDB continuously.

We’ll use Amazon Aurora-PostgreSQL as the source and VeloDB as the sink to demonstrate the entire process.


Prerequisites

Before starting, ensure you have:

  • An AWS RDS Aurora PostgreSQL instance (or self-hosted PostgreSQL).

  • A VeloDB warehouse (with FE nodes accessible via network).

  • Flink 1.17+ environment (we’ll cover both pre-built and manual installation).

  • Network connectivity between Flink, PostgreSQL/Aurora, and VeloDB (e.g., security groups, VPC peering).


Step 1: Set Up Aurora-PostgreSQL & Test Data

1.1 Create an Aurora-PostgreSQL Instance

First, create an AWS RDS Aurora PostgreSQL instance (skip this if you already have one).

1.2 Create a Database and Table

Connect to your Aurora-PostgreSQL instance and run the following SQL to create a test database and table:

CREATE DATABASE test_db;

-- 创建表
CREATE TABLE test_db.public.student (
    id INT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    age INT,
    email VARCHAR(255),
    phone VARCHAR(20),
    score NUMERIC(5,2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 插入数据
INSERT INTO test_db.public.student (id, name, age, email, phone, score, created_at) 
VALUES 
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80, NOW()),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00, NOW()),
(4, 'David Chen', 20, 'david@example.com', '13500135000', 85.60, NOW()),
(5, 'Emma Liu', 22, 'emma@example.com', '13700137000', 78.90, NOW());

Enter fullscreen mode Exit fullscreen mode

1.3 Enable PostgreSQL Logical Replication

Modify the PostgreSQL configuration: replace the DB Cluster Parameter with the parameter group created just now, apply the changes, and restart the service

Step 2: Install Flink with Doris/VeloDB Connector

VeloDB is compatible with the Flink Doris Connector, so we’ll use that to connect Flink to VeloDB. You can choose either the pre-built package or manual installation.

2.1 Pre-Built Installation (Simplest)

We provide a pre-built Flink 1.17 package with all required connectors (PostgreSQL CDC + Doris/VeloDB). Simply download and extract it:

tar -zxvf flink-1.17.2-bin-scala_2.12.tgz

Enter fullscreen mode Exit fullscreen mode

2.2 Manual Installation (For Existing Flink Environments)

If you already have Flink 1.17 installed, download the required dependencies and add them to the lib directory:

  1. Download Flink 1.17.2: Flink 1.17.2 Download

  2. Download Flink PostgreSQL CDC Connector: Flink Postgres CDC

  3. Download Flink Doris Connector: Flink Doris Connector

Step 3: Submit the Flink CDC Sync Job

The Flink Doris Connector will automatically create corresponding tables in VeloDB based on the PostgreSQL table structure. We’ll cover job submission in 4 common environments: Local, Standalone, Yarn, and Kubernetes.

Important Notes Before Submission

  • Port Correction: The example uses port=3306 (MySQL port) by mistake—PostgreSQL/Aurora default port is 5432 (update this in the commands below).

  • Database Name Fix: The example has --postgres-conf database-name=test (mismatch with test_db)—replace with test_db for consistency.

  • Customize Params: Replace placeholder values (e.g., hostname, fenodes, password) with your actual credentials.

3.1 Local Environment

> cd flink-1.17.2-bin
> bin/flink run -t local \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

3.2 Standalone Environment

> cd flink-1.17.2-bin
> bin/flink run -t remote \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

3.3 Yarn Environment (Per-Job Mode)

> cd flink-1.17.2-bin
> bin/flink run -t yarn-per-job \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --database test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

3.4 Kubernetes Environment (Session Mode)

> cd flink-1.17.2-bin
> bin/flink run -t kubernetes-session \
    -Dexecution.checkpointing.interval=10s \                                                                                                               
    -Dparallelism.default=1 \                                                                                                                              
    -c org.apache.doris.flink.tools.cdc.CdcTools \                                                                                                         
    lib/flink-doris-connector-1.17-25.1.0.jar \                                                                                                            
    postgres-sync-database \                                                                                                                                  
    --postgres test_db \                                                                                                                                   
    --postgres-conf hostname=database-test.xxx.us-east-1.rds.amazonaws.com \                                                                                                                      
    --postgres-conf port=3306 \                                                                                                                               
    --postgres-conf username=admin \                                                                                                                           
    --postgres-conf password=123456 \                                                                                                                         
    --postgres-conf database-name=test \                                                                                                                      
    --including-tables "student" \                                                                                                                        
    --sink-conf fenodes=lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:8080 \                                                                                                                 
    --sink-conf username=admin \                                                                                                                            
    --sink-conf password=123456 \                                                                                                                                
    --sink-conf jdbc-url=jdbc:mysql://lb-40579077-a97732bc6c030909.elb.us-east-1.amazonaws.com:9030 \                                                                                                   
    --sink-conf sink.label-prefix=label

Enter fullscreen mode Exit fullscreen mode

💡 Tip: For more Flink Doris Connector parameters, check the official documentation.

Step 4: Verify Data Sync

4.1 Verify Full Historical Data Sync

The Flink job will first sync all existing data from PostgreSQL to VeloDB. Connect to your VeloDB warehouse and query the student table to confirm the data is present.

4.2 Verify Real-Time Incremental Sync

To capture DELETE operations (required for full incremental sync), first enable full replica identity on the PostgreSQL table:

ALTER TABLE public.student REPLICA IDENTITY FULL

Enter fullscreen mode Exit fullscreen mode

📚 Reference: PostgreSQL Replica Identity Documentation

Now, modify data in PostgreSQL:

INSERT INTO student (id, name, age, email, phone, score, created_at) 
VALUES 
(6, 'Frank Zhao', 24, 'frank@example.com', '13400134000', 88.75, NOW());

DELETE FROM student WHERE id = 3;

UPDATE student 
SET score = 95.00, age = 23 
WHERE id = 2;

Enter fullscreen mode Exit fullscreen mode

Check VeloDB to confirm the changes are synced in real-time:

Common Pitfalls to Avoid

  1. PostgreSQL Port Mismatch: Don’t use 3306 (MySQL) for PostgreSQL—use 5432 instead.

  2. Logical Replication Not Enabled: Without rds.logical_replication = 1, CDC events won’t be captured.

  3. Replica Identity Missing: For DELETE operations, REPLICA IDENTITY FULL is required (otherwise, deletes won’t sync).

  4. Network Connectivity: Ensure Flink can reach Aurora-PostgreSQL (5432) and VeloDB (8080, 9030) via security groups/VPC.

Wrapping Up

Flink CDC provides a robust, real-time way to sync data from PostgreSQL/Aurora to VeloDB, covering both full data loads and incremental changes. By following this guide, you can set up a reliable sync pipeline with minimal effort. If you run into issues, check the Flink and VeloDB logs for details, or refer to the official documentation for additional parameters.

Happy syncing! 🚀

Top comments (0)