Introduction
When building modern data platforms, one of the most common challenges is replicating relational database data into a data lake for analytics. AWS Database Migration Service (DMS) provides a powerful solution for continuous data replication from PostgreSQL to S3, complete with Change Data Capture (CDC) capabilities.
In this article, I'll show you how to build a production-ready data pipeline using AWS CDK (TypeScript) that:
- Replicates PostgreSQL data to S3 in Parquet format
 - Automatically catalogs tables using AWS Glue
 - Enables SQL queries via Amazon Athena
 - Supports both full load and ongoing CDC
 
Architecture Overview
Our solution follows a layered architecture pattern:
PostgreSQL (RDS) 
    ↓
AWS DMS (Serverless)
    ↓
S3 Data Lake (Parquet)
    ↓
AWS Glue Catalog
    ↓
Amazon Athena
Key Components
- DMS Replication: Handles full load and CDC from PostgreSQL
 - S3 Data Lake: Stores data in compressed Parquet format
 - Glue Data Catalog: Provides schema discovery and metadata management
 - Athena Workgroups: Enables SQL queries on the data lake
 
Prerequisites
Before starting, ensure you have:
- AWS CDK installed (
npm install -g aws-cdk) - An existing VPC with private subnets
 - A PostgreSQL database with logical replication enabled
 - Database credentials stored in AWS Secrets Manager
 - TypeScript knowledge
 
Implementation
Step 1: Base S3 Construct
First, we create a secure S3 bucket for our data lake:
import { Construct } from 'constructs';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as iam from 'aws-cdk-lib/aws-iam';
import { RemovalPolicy } from 'aws-cdk-lib';
export interface S3BaseProps {
    bucketName?: string;
    removalPolicy?: RemovalPolicy;
    iamRolesProps?: iam.RoleProps[];
}
export class S3Base extends Construct {
    public readonly bucket: s3.Bucket;
    public readonly iamRoles: iam.Role[];
    constructor(scope: Construct, id: string, props: S3BaseProps = {}) {
        super(scope, id);
        this.iamRoles = [];
        this.bucket = new s3.Bucket(this, `instance-${id}`, {
            bucketName: props.bucketName,
            versioned: true,
            blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
            encryption: s3.BucketEncryption.S3_MANAGED,
            enforceSSL: true,
            removalPolicy: props.removalPolicy ?? RemovalPolicy.RETAIN,
            autoDeleteObjects: false,
        });
        // Create IAM roles for DMS access
        if (props.iamRolesProps && props.iamRolesProps.length > 0) {
            props.iamRolesProps.forEach((iamRolesProp, index) => {
                const role = new iam.Role(this, `bucket-${id}-${index}-role`, iamRolesProp);
                this.bucket.grantReadWrite(role);
                this.iamRoles.push(role);
            });
        }
    }
}
Step 2: DMS Base Construct
The DMS construct handles all replication configuration:
export interface DmsBaseProps extends IProps {
    vpcTag: string;
    subnetsIds?: string[];
    securityGroup?: ec2.ISecurityGroup;
    s3Base: S3Base;
    databaseName: string;
    port?: number;
    secretsManagerSecretId: string;
    minCapacityUnits?: number;
    maxCapacityUnits?: number;
    multiAz?: boolean;
    dmsVpcRole?: iam.Role;
    dmsCloudWatchLogsRole?: iam.Role;
    dmsSecretsAccessRole?: iam.Role;
}
export class DmsBase extends Construct {
    public readonly sourceEndpoint: dms.CfnEndpoint;
    public readonly targetEndpoint: dms.CfnEndpoint;
    public readonly replicationConfig: dms.CfnReplicationConfig;
    constructor(scope: Construct, id: string, props: DmsBaseProps) {
        super(scope, id);
        // Lookup VPC
        const vpc = ec2.Vpc.fromLookup(this, 'Vpc', {
            tags: { Name: props.vpcTag }
        });
        // Create or use provided IAM roles
        const vpcRole = props.dmsVpcRole ?? this.createVpcRole(id);
        const secretsAccessRole = props.dmsSecretsAccessRole ?? 
            this.createSecretsAccessRole(id, props.envName);
        // Create subnet group
        const subnetGroup = this.createSubnetGroup(vpc, props);
        // Create security group if not provided
        const securityGroup = props.securityGroup ?? 
            this.createSecurityGroup(vpc, id);
        // Create source endpoint (PostgreSQL)
        this.sourceEndpoint = new dms.CfnEndpoint(this, `${id}-pg-source`, {
            endpointIdentifier: `${props.replicationConfigIdentifier}-pg-source`,
            endpointType: 'source',
            engineName: 'postgres',
            databaseName: props.databaseName,
            postgreSqlSettings: {
                secretsManagerSecretId: props.secretsManagerSecretId,
                secretsManagerAccessRoleArn: secretsAccessRole.roleArn,
            },
            extraConnectionAttributes: [
                'pluginName=pgoutput',
                'captureDdls=false',
                'heartbeatEnable=true',
                'heartbeatFrequency=1',
            ].join(';'),
            sslMode: 'require',
        });
        // Create target endpoint (S3)
        this.targetEndpoint = this.createS3TargetEndpoint(props);
        // Create replication config (serverless)
        this.replicationConfig = this.createReplicationConfig(props);
    }
    private createS3TargetEndpoint(props: DmsBaseProps): dms.CfnEndpoint {
        return new dms.CfnEndpoint(this, 'S3Target', {
            endpointIdentifier: `${props.replicationConfigIdentifier}-s3-target`,
            endpointType: 'target',
            engineName: 's3',
            s3Settings: {
                serviceAccessRoleArn: props.s3Base.iamRoles[0].roleArn,
                bucketName: props.s3Base.bucket.bucketName,
                bucketFolder: 'rds-cdc',
                dataFormat: 'parquet',
                compressionType: 'gzip',
                parquetVersion: 'parquet-2-0',
                includeOpForFullLoad: true,
                cdcInsertsAndUpdates: true,
                timestampColumnName: '__dms_ts',
                datePartitionEnabled: false,
                // Unified file structure for easier querying
                cdcPath: 'public',
            },
        });
    }
    private createReplicationConfig(props: DmsBaseProps): dms.CfnReplicationConfig {
        return new dms.CfnReplicationConfig(this, 'ReplicationConfig', {
            replicationConfigIdentifier: props.replicationConfigIdentifier,
            replicationType: 'full-load-and-cdc',
            sourceEndpointArn: this.sourceEndpoint.ref,
            targetEndpointArn: this.targetEndpoint.ref,
            computeConfig: {
                maxCapacityUnits: props.maxCapacityUnits ?? 4,
                minCapacityUnits: props.minCapacityUnits ?? 2,
                multiAz: props.multiAz ?? true,
                replicationSubnetGroupId: this.subnetGroup.ref,
                vpcSecurityGroupIds: [this.securityGroup.securityGroupId],
            },
            replicationSettings: {
                Logging: {
                    EnableLogging: true,
                    LogComponents: [
                        { Id: 'SOURCE_CAPTURE', Severity: 'LOGGER_SEVERITY_INFO' },
                        { Id: 'TARGET_APPLY', Severity: 'LOGGER_SEVERITY_INFO' },
                    ],
                },
            },
            tableMappings: this.createTableMappings(props.selectedTables),
        });
    }
    private createTableMappings(tables: string[]): string {
        const rules = tables.map(tableName => ({
            'rule-type': 'selection',
            'rule-id': tableName,
            'rule-name': tableName,
            'object-locator': {
                'schema-name': 'public',
                'table-name': tableName,
            },
            'rule-action': 'include',
        }));
        return JSON.stringify({ rules });
    }
}
Step 3: Glue Catalog Base
The Glue catalog provides metadata management and schema discovery:
export interface GlueBaseCatalogProps {
    envName: string;
    projectName: string;
    s3DataBucket: s3.Bucket;
    cdcPath: string;
    catalogSuffix: string;
    useGlueCrawler?: boolean;
    crawlerSchedule?: string;
}
export abstract class GlueBaseCatalog extends Construct {
    public readonly database: glue.CfnDatabase;
    public readonly crawler?: glue.CfnCrawler;
    protected readonly databaseName: string;
    constructor(scope: Construct, id: string, props: GlueBaseCatalogProps) {
        super(scope, id);
        this.databaseName = `${props.projectName}_${props.envName}_${props.catalogSuffix}`;
        // Create Glue Database
        this.database = new glue.CfnDatabase(this, 'Database', {
            catalogId: Stack.of(this).account,
            databaseInput: {
                name: this.databaseName,
                description: this.getDatabaseDescription(),
            },
        });
        // Create crawler for automatic schema discovery
        if (props.useGlueCrawler) {
            this.crawler = this.createCrawler(props);
        }
    }
    private createCrawler(props: GlueBaseCatalogProps): glue.CfnCrawler {
        const crawlerRole = new iam.Role(this, 'CrawlerRole', {
            assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
            managedPolicies: [
                iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')
            ],
            inlinePolicies: {
                S3Access: new iam.PolicyDocument({
                    statements: [
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: [
                                's3:GetObject',
                                's3:ListBucket',
                                's3:GetBucketLocation'
                            ],
                            resources: [
                                props.s3DataBucket.bucketArn,
                                `${props.s3DataBucket.bucketArn}/*`
                            ]
                        })
                    ]
                })
            }
        });
        const s3Target = `s3://${props.s3DataBucket.bucketName}/rds-cdc/public/`;
        return new glue.CfnCrawler(this, 'Crawler', {
            name: `${props.projectName}-${props.envName}-${props.catalogSuffix}-crawler`,
            role: crawlerRole.roleArn,
            databaseName: this.databaseName,
            targets: {
                s3Targets: [{
                    path: s3Target,
                    exclusions: ['**/_tmp/**', '**/.tmp/**']
                }]
            },
            configuration: JSON.stringify({
                Version: 1.0,
                CrawlerOutput: {
                    Partitions: { AddOrUpdateBehavior: 'InheritFromTable' },
                    Tables: { AddOrUpdateBehavior: 'MergeNewColumns' }
                },
                Grouping: {
                    TableGroupingPolicy: 'CombineCompatibleSchemas',
                    TableLevelConfiguration: 4
                }
            }),
            schemaChangePolicy: {
                updateBehavior: 'UPDATE_IN_DATABASE',
                deleteBehavior: 'LOG'
            },
            ...(props.crawlerSchedule && {
                schedule: {
                    scheduleExpression: props.crawlerSchedule
                }
            })
        });
    }
    protected abstract getDatabaseDescription(): string;
    protected abstract getTableList(): string[];
    public getDatabaseName(): string {
        return this.databaseName;
    }
}
Step 4: Athena Workgroup
Athena provides SQL query capabilities:
export interface AthenaBaseProps {
    envName: string;
    projectName: string;
    glueDatabase: string;
    s3DataBucket: s3.Bucket;
    workgroupSuffix: string;
}
export abstract class AthenaBase extends Construct {
    public readonly workGroup: athena.CfnWorkGroup;
    public readonly queryResultsBucket: s3.Bucket;
    constructor(scope: Construct, id: string, props: AthenaBaseProps) {
        super(scope, id);
        const workgroupName = `${props.projectName}-${props.envName}-${props.workgroupSuffix}`;
        // Create S3 bucket for query results
        this.queryResultsBucket = new s3.Bucket(this, 'QueryResultsBucket', {
            bucketName: `${props.projectName}-athena-${props.workgroupSuffix}-results-${props.envName}`,
            removalPolicy: RemovalPolicy.DESTROY,
            autoDeleteObjects: true,
            blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
            versioned: true,
            lifecycleRules: [{
                id: 'DeleteOldQueryResults',
                enabled: true,
                expiration: Duration.days(30),
            }],
        });
        // Create Athena WorkGroup
        this.workGroup = new athena.CfnWorkGroup(this, 'WorkGroup', {
            name: workgroupName,
            description: this.getWorkGroupDescription(),
            state: 'ENABLED',
            workGroupConfiguration: {
                resultConfiguration: {
                    outputLocation: `s3://${this.queryResultsBucket.bucketName}/query-results/`,
                    encryptionConfiguration: {
                        encryptionOption: 'SSE_S3',
                    },
                },
                enforceWorkGroupConfiguration: true,
                bytesScannedCutoffPerQuery: 1000000000, // 1GB limit
            },
        });
    }
    protected abstract getWorkGroupDescription(): string;
    public createAthenaUserRole(roleName: string): iam.Role {
        return new iam.Role(this, `AthenaUserRole-${roleName}`, {
            roleName: `${roleName}-athena-${this.props.workgroupSuffix}-${this.props.envName}`,
            assumedBy: new iam.ServicePrincipal('athena.amazonaws.com'),
            description: `Role for Athena users to query data`,
            inlinePolicies: {
                AthenaQueryPolicy: new iam.PolicyDocument({
                    statements: [
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: [
                                'athena:StartQueryExecution',
                                'athena:GetQueryExecution',
                                'athena:GetQueryResults',
                            ],
                            resources: [`arn:aws:athena:*:*:workgroup/${this.workGroup.name}`],
                        }),
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: [
                                'glue:GetDatabase',
                                'glue:GetTable',
                                'glue:GetPartitions',
                            ],
                            resources: ['*'],
                        }),
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: ['s3:GetObject', 's3:ListBucket'],
                            resources: [
                                this.props.s3DataBucket.bucketArn,
                                `${this.props.s3DataBucket.bucketArn}/*`,
                            ],
                        }),
                    ],
                }),
            },
        });
    }
}
Step 5: Putting It All Together
Here's how to use these constructs in your stack:
export class DataLakeStack extends Stack {
    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props);
        // 1. Create required IAM roles (DMS requires specific names)
        const dmsVpcRole = new iam.Role(this, 'DmsVpcRole', {
            roleName: 'dms-vpc-role', // Required exact name
            assumedBy: new iam.CompositePrincipal(
                new iam.ServicePrincipal('dms.amazonaws.com'),
                new iam.ServicePrincipal('dms.us-east-1.amazonaws.com')
            ),
            managedPolicies: [
                iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonDMSVPCManagementRole')
            ],
        });
        const dmsCloudWatchLogsRole = new iam.Role(this, 'DmsCloudWatchLogsRole', {
            roleName: 'dms-cloudwatch-logs-role', // Required exact name
            assumedBy: new iam.ServicePrincipal('dms.amazonaws.com'),
            managedPolicies: [
                iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonDMSCloudWatchLogsRole')
            ],
        });
        const dmsSecretsAccessRole = new iam.Role(this, 'DmsSecretsAccessRole', {
            roleName: `dms-secrets-access-role-production`,
            assumedBy: new iam.CompositePrincipal(
                new iam.ServicePrincipal('dms.amazonaws.com'),
                new iam.ServicePrincipal('dms.us-east-1.amazonaws.com')
            ),
            inlinePolicies: {
                SecretsRead: new iam.PolicyDocument({
                    statements: [
                        new iam.PolicyStatement({
                            actions: ['secretsmanager:GetSecretValue'],
                            resources: ['*'],
                        }),
                    ],
                }),
            },
        });
        // 2. Create S3 data lake
        const s3Base = new S3Base(this, 'S3DmsBase', {
            bucketName: `my-project-production-dms-data-lake`,
            iamRolesProps: [{
                assumedBy: new iam.ServicePrincipal('dms.amazonaws.com'),
                description: 'Role for DMS to write to S3',
            }]
        });
        // 3. Create Glue Catalog
        const glueCatalog = new GlueSourceCatalog(this, 'GlueCatalog', {
            envName: 'production',
            projectName: 'my-project',
            s3DataBucket: s3Base.bucket,
            cdcPath: 'source-database/',
            useAutomaticSchemaDiscovery: true,
        });
        // 4. Create DMS replication
        new SourceDatabaseDms(this, 'SourceDatabaseDms', {
            envName: 'production',
            projectName: 'my-project',
            s3Base,
            databaseName: 'mydb',
            secretsManagerSecretId: 'my-db-secret',
            replicationConfigIdentifier: 'source-database-production',
            vpcTag: 'main-vpc',
            dmsVpcRole,
            dmsCloudWatchLogsRole,
            dmsSecretsAccessRole,
            glueCatalog,
        });
        // 5. Create Athena workgroup
        const athenaSource = new AthenaSource(this, 'AthenaSource', {
            envName: 'production',
            projectName: 'my-project',
            glueDatabase: glueCatalog.getDatabaseName(),
            s3DataBucket: s3Base.bucket,
        });
        // 6. Create Athena user role
        athenaSource.createAthenaUserRole('DataAnalyst');
    }
}
Configuration Details
Table Selection
Define which tables to replicate in a constants file:
export const selectedTables = [
    'users',
    'orders',
    'products',
    'customers',
    // ... more tables
];
export const cdcPath = '<instance-name>-database/';
PostgreSQL Configuration
Ensure your PostgreSQL database has these settings:
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
-- Restart PostgreSQL after making these changes
That setup needs to be done through AWS Parameter Group if you're AWS RDS.
Secrets Manager Format
Your database secret should have this structure:
{
  "username": "dbuser",
  "password": "securepassword",
  "host": "mydb.region.rds.amazonaws.com",
  "port": 5432,
  "dbname": "mydb"
}
Key Features & Best Practices
1. Serverless DMS
We use DMS Serverless with auto-scaling:
- Min capacity: 2 DCU
 - Max capacity: 4 DCU
 - Multi-AZ for high availability
 
2. Parquet Format
DMS writes data in Parquet format with:
- GZIP compression for storage efficiency
 - Operation column (
Op) for CDC tracking (I=Insert, U=Update, D=Delete) - Timestamp column (
__dms_ts) for change tracking 
3. S3 Structure
Data is organized as:
s3://bucket/rds-cdc/public/tablename/
  ├── LOAD00000001.parquet
  ├── 20231001-12345.parquet
  └── 20231002-67890.parquet
I recommend you organize like that, so you can use the auto-discovery of Glue Crawlers.
4. Glue Crawler
The crawler runs automatically:
- Production: Daily at 2 AM (you can change this according your case)
 - Automatically discovers new tables and schema changes
 - Updates the Glue Data Catalog
 
5. Security
Multiple layers of security:
- VPC endpoints for private connectivity
 - IAM roles with least privilege
 - S3 bucket encryption (SSE-S3)
 - SSL/TLS for database connections
 - Secrets Manager for credentials
 
Querying Data with Athena
Once deployed, on Athena, query your data using SQL:
-- View recent changes
SELECT *
FROM my_project_production_source_db.users
WHERE __dms_ts > current_timestamp - interval '1' day
ORDER BY __dms_ts DESC;
-- Get current state (latest record for each ID)
WITH latest_records AS (
  SELECT *, 
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY __dms_ts DESC) as rn
  FROM my_project_production_source_db.users
)
SELECT *
FROM latest_records
WHERE rn = 1 AND Op != 'D'; -- Exclude deleted records
Cost Optimization
- DMS Serverless: Only pay for active replication time
 - S3 Lifecycle: Automatically delete query results after 30 days
 - Parquet + Gzip: Reduces storage by ~70% compared to CSV
 - Athena Query Limits: 1GB scan limit per query prevents runaway costs
 
Monitoring
CloudWatch provides comprehensive monitoring:
- DMS replication lag
 - S3 bucket metrics
 - Glue crawler runs
 - Athena query metrics
 
Create alarms for:
new cloudwatch.Alarm(this, 'DmsLatencyAlarm', {
    metric: replicationConfig.metricReplicationLatency(),
    threshold: 300, // 5 minutes
    evaluationPeriods: 2,
});
Deployment
Deploy your stack:
# Bootstrap CDK (first time only)
cdk bootstrap
# Synthesize CloudFormation template
cdk synth
# Deploy
cdk deploy DataLakeStack (you can change)
# After deployment, start the DMS replication in AWS Console
# or use AWS CLI:
aws dms start-replication \
    --replication-config-arn <arn> \
    --start-replication-type start-replication
Common Issues & Solutions
Issue 1: DMS Can't Connect to PostgreSQL
Solution: Ensure RDS security group allows inbound from DMS security group on port 5432.
Issue 2: Tables Not Appearing in Glue
Solution: Run the Glue crawler manually first, then check CloudWatch logs.
Issue 3: Athena Queries Return No Results
Solution: Verify S3 path in Glue table matches actual data location.
Issue 4: DMS Replication Fails
Solution: Check that PostgreSQL has wal_level=logical and sufficient replication slots.
Conclusion
This architecture provides a robust, scalable solution for replicating PostgreSQL data to a data lake. The combination of DMS, Glue, and Athena creates a powerful analytics platform that:
- Scales automatically with your data
 - Provides near real-time data availability
 - Supports complex analytical queries
 - Maintains cost efficiency
 - Offers production-grade reliability
 
The modular CDK design makes it easy to extend with additional databases, customize table selection, or add data transformations.
Resources
This architecture has been battle-tested in production environments handling millions of records daily. Feel free to adapt it to your specific needs!
    
Top comments (0)