DEV Community

André Paris
André Paris

Posted on

Building a Production-Ready Data Lake: PostgreSQL to S3 with AWS DMS, Glue, and Athena using CDK

Introduction

When building modern data platforms, one of the most common challenges is replicating relational database data into a data lake for analytics. AWS Database Migration Service (DMS) provides a powerful solution for continuous data replication from PostgreSQL to S3, complete with Change Data Capture (CDC) capabilities.

In this article, I'll show you how to build a production-ready data pipeline using AWS CDK (TypeScript) that:

  • Replicates PostgreSQL data to S3 in Parquet format
  • Automatically catalogs tables using AWS Glue
  • Enables SQL queries via Amazon Athena
  • Supports both full load and ongoing CDC

Architecture Overview

Our solution follows a layered architecture pattern:

PostgreSQL (RDS) 
    ↓
AWS DMS (Serverless)
    ↓
S3 Data Lake (Parquet)
    ↓
AWS Glue Catalog
    ↓
Amazon Athena
Enter fullscreen mode Exit fullscreen mode

Key Components

  1. DMS Replication: Handles full load and CDC from PostgreSQL
  2. S3 Data Lake: Stores data in compressed Parquet format
  3. Glue Data Catalog: Provides schema discovery and metadata management
  4. Athena Workgroups: Enables SQL queries on the data lake

Prerequisites

Before starting, ensure you have:

  • AWS CDK installed (npm install -g aws-cdk)
  • An existing VPC with private subnets
  • A PostgreSQL database with logical replication enabled
  • Database credentials stored in AWS Secrets Manager
  • TypeScript knowledge

Implementation

Step 1: Base S3 Construct

First, we create a secure S3 bucket for our data lake:

import { Construct } from 'constructs';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as iam from 'aws-cdk-lib/aws-iam';
import { RemovalPolicy } from 'aws-cdk-lib';

export interface S3BaseProps {
    bucketName?: string;
    removalPolicy?: RemovalPolicy;
    iamRolesProps?: iam.RoleProps[];
}

export class S3Base extends Construct {
    public readonly bucket: s3.Bucket;
    public readonly iamRoles: iam.Role[];

    constructor(scope: Construct, id: string, props: S3BaseProps = {}) {
        super(scope, id);

        this.iamRoles = [];
        this.bucket = new s3.Bucket(this, `instance-${id}`, {
            bucketName: props.bucketName,
            versioned: true,
            blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
            encryption: s3.BucketEncryption.S3_MANAGED,
            enforceSSL: true,
            removalPolicy: props.removalPolicy ?? RemovalPolicy.RETAIN,
            autoDeleteObjects: false,
        });

        // Create IAM roles for DMS access
        if (props.iamRolesProps && props.iamRolesProps.length > 0) {
            props.iamRolesProps.forEach((iamRolesProp, index) => {
                const role = new iam.Role(this, `bucket-${id}-${index}-role`, iamRolesProp);
                this.bucket.grantReadWrite(role);
                this.iamRoles.push(role);
            });
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: DMS Base Construct

The DMS construct handles all replication configuration:

export interface DmsBaseProps extends IProps {
    vpcTag: string;
    subnetsIds?: string[];
    securityGroup?: ec2.ISecurityGroup;
    s3Base: S3Base;
    databaseName: string;
    port?: number;
    secretsManagerSecretId: string;
    minCapacityUnits?: number;
    maxCapacityUnits?: number;
    multiAz?: boolean;
    dmsVpcRole?: iam.Role;
    dmsCloudWatchLogsRole?: iam.Role;
    dmsSecretsAccessRole?: iam.Role;
}

export class DmsBase extends Construct {
    public readonly sourceEndpoint: dms.CfnEndpoint;
    public readonly targetEndpoint: dms.CfnEndpoint;
    public readonly replicationConfig: dms.CfnReplicationConfig;

    constructor(scope: Construct, id: string, props: DmsBaseProps) {
        super(scope, id);

        // Lookup VPC
        const vpc = ec2.Vpc.fromLookup(this, 'Vpc', {
            tags: { Name: props.vpcTag }
        });

        // Create or use provided IAM roles
        const vpcRole = props.dmsVpcRole ?? this.createVpcRole(id);
        const secretsAccessRole = props.dmsSecretsAccessRole ?? 
            this.createSecretsAccessRole(id, props.envName);

        // Create subnet group
        const subnetGroup = this.createSubnetGroup(vpc, props);

        // Create security group if not provided
        const securityGroup = props.securityGroup ?? 
            this.createSecurityGroup(vpc, id);

        // Create source endpoint (PostgreSQL)
        this.sourceEndpoint = new dms.CfnEndpoint(this, `${id}-pg-source`, {
            endpointIdentifier: `${props.replicationConfigIdentifier}-pg-source`,
            endpointType: 'source',
            engineName: 'postgres',
            databaseName: props.databaseName,
            postgreSqlSettings: {
                secretsManagerSecretId: props.secretsManagerSecretId,
                secretsManagerAccessRoleArn: secretsAccessRole.roleArn,
            },
            extraConnectionAttributes: [
                'pluginName=pgoutput',
                'captureDdls=false',
                'heartbeatEnable=true',
                'heartbeatFrequency=1',
            ].join(';'),
            sslMode: 'require',
        });

        // Create target endpoint (S3)
        this.targetEndpoint = this.createS3TargetEndpoint(props);

        // Create replication config (serverless)
        this.replicationConfig = this.createReplicationConfig(props);
    }

    private createS3TargetEndpoint(props: DmsBaseProps): dms.CfnEndpoint {
        return new dms.CfnEndpoint(this, 'S3Target', {
            endpointIdentifier: `${props.replicationConfigIdentifier}-s3-target`,
            endpointType: 'target',
            engineName: 's3',
            s3Settings: {
                serviceAccessRoleArn: props.s3Base.iamRoles[0].roleArn,
                bucketName: props.s3Base.bucket.bucketName,
                bucketFolder: 'rds-cdc',
                dataFormat: 'parquet',
                compressionType: 'gzip',
                parquetVersion: 'parquet-2-0',
                includeOpForFullLoad: true,
                cdcInsertsAndUpdates: true,
                timestampColumnName: '__dms_ts',
                datePartitionEnabled: false,
                // Unified file structure for easier querying
                cdcPath: 'public',
            },
        });
    }

    private createReplicationConfig(props: DmsBaseProps): dms.CfnReplicationConfig {
        return new dms.CfnReplicationConfig(this, 'ReplicationConfig', {
            replicationConfigIdentifier: props.replicationConfigIdentifier,
            replicationType: 'full-load-and-cdc',
            sourceEndpointArn: this.sourceEndpoint.ref,
            targetEndpointArn: this.targetEndpoint.ref,
            computeConfig: {
                maxCapacityUnits: props.maxCapacityUnits ?? 4,
                minCapacityUnits: props.minCapacityUnits ?? 2,
                multiAz: props.multiAz ?? true,
                replicationSubnetGroupId: this.subnetGroup.ref,
                vpcSecurityGroupIds: [this.securityGroup.securityGroupId],
            },
            replicationSettings: {
                Logging: {
                    EnableLogging: true,
                    LogComponents: [
                        { Id: 'SOURCE_CAPTURE', Severity: 'LOGGER_SEVERITY_INFO' },
                        { Id: 'TARGET_APPLY', Severity: 'LOGGER_SEVERITY_INFO' },
                    ],
                },
            },
            tableMappings: this.createTableMappings(props.selectedTables),
        });
    }

    private createTableMappings(tables: string[]): string {
        const rules = tables.map(tableName => ({
            'rule-type': 'selection',
            'rule-id': tableName,
            'rule-name': tableName,
            'object-locator': {
                'schema-name': 'public',
                'table-name': tableName,
            },
            'rule-action': 'include',
        }));

        return JSON.stringify({ rules });
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Glue Catalog Base

The Glue catalog provides metadata management and schema discovery:

export interface GlueBaseCatalogProps {
    envName: string;
    projectName: string;
    s3DataBucket: s3.Bucket;
    cdcPath: string;
    catalogSuffix: string;
    useGlueCrawler?: boolean;
    crawlerSchedule?: string;
}

export abstract class GlueBaseCatalog extends Construct {
    public readonly database: glue.CfnDatabase;
    public readonly crawler?: glue.CfnCrawler;
    protected readonly databaseName: string;

    constructor(scope: Construct, id: string, props: GlueBaseCatalogProps) {
        super(scope, id);

        this.databaseName = `${props.projectName}_${props.envName}_${props.catalogSuffix}`;

        // Create Glue Database
        this.database = new glue.CfnDatabase(this, 'Database', {
            catalogId: Stack.of(this).account,
            databaseInput: {
                name: this.databaseName,
                description: this.getDatabaseDescription(),
            },
        });

        // Create crawler for automatic schema discovery
        if (props.useGlueCrawler) {
            this.crawler = this.createCrawler(props);
        }
    }

    private createCrawler(props: GlueBaseCatalogProps): glue.CfnCrawler {
        const crawlerRole = new iam.Role(this, 'CrawlerRole', {
            assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
            managedPolicies: [
                iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')
            ],
            inlinePolicies: {
                S3Access: new iam.PolicyDocument({
                    statements: [
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: [
                                's3:GetObject',
                                's3:ListBucket',
                                's3:GetBucketLocation'
                            ],
                            resources: [
                                props.s3DataBucket.bucketArn,
                                `${props.s3DataBucket.bucketArn}/*`
                            ]
                        })
                    ]
                })
            }
        });

        const s3Target = `s3://${props.s3DataBucket.bucketName}/rds-cdc/public/`;

        return new glue.CfnCrawler(this, 'Crawler', {
            name: `${props.projectName}-${props.envName}-${props.catalogSuffix}-crawler`,
            role: crawlerRole.roleArn,
            databaseName: this.databaseName,
            targets: {
                s3Targets: [{
                    path: s3Target,
                    exclusions: ['**/_tmp/**', '**/.tmp/**']
                }]
            },
            configuration: JSON.stringify({
                Version: 1.0,
                CrawlerOutput: {
                    Partitions: { AddOrUpdateBehavior: 'InheritFromTable' },
                    Tables: { AddOrUpdateBehavior: 'MergeNewColumns' }
                },
                Grouping: {
                    TableGroupingPolicy: 'CombineCompatibleSchemas',
                    TableLevelConfiguration: 4
                }
            }),
            schemaChangePolicy: {
                updateBehavior: 'UPDATE_IN_DATABASE',
                deleteBehavior: 'LOG'
            },
            ...(props.crawlerSchedule && {
                schedule: {
                    scheduleExpression: props.crawlerSchedule
                }
            })
        });
    }

    protected abstract getDatabaseDescription(): string;
    protected abstract getTableList(): string[];

    public getDatabaseName(): string {
        return this.databaseName;
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Athena Workgroup

Athena provides SQL query capabilities:

export interface AthenaBaseProps {
    envName: string;
    projectName: string;
    glueDatabase: string;
    s3DataBucket: s3.Bucket;
    workgroupSuffix: string;
}

export abstract class AthenaBase extends Construct {
    public readonly workGroup: athena.CfnWorkGroup;
    public readonly queryResultsBucket: s3.Bucket;

    constructor(scope: Construct, id: string, props: AthenaBaseProps) {
        super(scope, id);

        const workgroupName = `${props.projectName}-${props.envName}-${props.workgroupSuffix}`;

        // Create S3 bucket for query results
        this.queryResultsBucket = new s3.Bucket(this, 'QueryResultsBucket', {
            bucketName: `${props.projectName}-athena-${props.workgroupSuffix}-results-${props.envName}`,
            removalPolicy: RemovalPolicy.DESTROY,
            autoDeleteObjects: true,
            blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
            versioned: true,
            lifecycleRules: [{
                id: 'DeleteOldQueryResults',
                enabled: true,
                expiration: Duration.days(30),
            }],
        });

        // Create Athena WorkGroup
        this.workGroup = new athena.CfnWorkGroup(this, 'WorkGroup', {
            name: workgroupName,
            description: this.getWorkGroupDescription(),
            state: 'ENABLED',
            workGroupConfiguration: {
                resultConfiguration: {
                    outputLocation: `s3://${this.queryResultsBucket.bucketName}/query-results/`,
                    encryptionConfiguration: {
                        encryptionOption: 'SSE_S3',
                    },
                },
                enforceWorkGroupConfiguration: true,
                bytesScannedCutoffPerQuery: 1000000000, // 1GB limit
            },
        });
    }

    protected abstract getWorkGroupDescription(): string;

    public createAthenaUserRole(roleName: string): iam.Role {
        return new iam.Role(this, `AthenaUserRole-${roleName}`, {
            roleName: `${roleName}-athena-${this.props.workgroupSuffix}-${this.props.envName}`,
            assumedBy: new iam.ServicePrincipal('athena.amazonaws.com'),
            description: `Role for Athena users to query data`,
            inlinePolicies: {
                AthenaQueryPolicy: new iam.PolicyDocument({
                    statements: [
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: [
                                'athena:StartQueryExecution',
                                'athena:GetQueryExecution',
                                'athena:GetQueryResults',
                            ],
                            resources: [`arn:aws:athena:*:*:workgroup/${this.workGroup.name}`],
                        }),
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: [
                                'glue:GetDatabase',
                                'glue:GetTable',
                                'glue:GetPartitions',
                            ],
                            resources: ['*'],
                        }),
                        new iam.PolicyStatement({
                            effect: iam.Effect.ALLOW,
                            actions: ['s3:GetObject', 's3:ListBucket'],
                            resources: [
                                this.props.s3DataBucket.bucketArn,
                                `${this.props.s3DataBucket.bucketArn}/*`,
                            ],
                        }),
                    ],
                }),
            },
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: Putting It All Together

Here's how to use these constructs in your stack:

export class DataLakeStack extends Stack {
    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props);

        // 1. Create required IAM roles (DMS requires specific names)
        const dmsVpcRole = new iam.Role(this, 'DmsVpcRole', {
            roleName: 'dms-vpc-role', // Required exact name
            assumedBy: new iam.CompositePrincipal(
                new iam.ServicePrincipal('dms.amazonaws.com'),
                new iam.ServicePrincipal('dms.us-east-1.amazonaws.com')
            ),
            managedPolicies: [
                iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonDMSVPCManagementRole')
            ],
        });

        const dmsCloudWatchLogsRole = new iam.Role(this, 'DmsCloudWatchLogsRole', {
            roleName: 'dms-cloudwatch-logs-role', // Required exact name
            assumedBy: new iam.ServicePrincipal('dms.amazonaws.com'),
            managedPolicies: [
                iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonDMSCloudWatchLogsRole')
            ],
        });

        const dmsSecretsAccessRole = new iam.Role(this, 'DmsSecretsAccessRole', {
            roleName: `dms-secrets-access-role-production`,
            assumedBy: new iam.CompositePrincipal(
                new iam.ServicePrincipal('dms.amazonaws.com'),
                new iam.ServicePrincipal('dms.us-east-1.amazonaws.com')
            ),
            inlinePolicies: {
                SecretsRead: new iam.PolicyDocument({
                    statements: [
                        new iam.PolicyStatement({
                            actions: ['secretsmanager:GetSecretValue'],
                            resources: ['*'],
                        }),
                    ],
                }),
            },
        });

        // 2. Create S3 data lake
        const s3Base = new S3Base(this, 'S3DmsBase', {
            bucketName: `my-project-production-dms-data-lake`,
            iamRolesProps: [{
                assumedBy: new iam.ServicePrincipal('dms.amazonaws.com'),
                description: 'Role for DMS to write to S3',
            }]
        });

        // 3. Create Glue Catalog
        const glueCatalog = new GlueSourceCatalog(this, 'GlueCatalog', {
            envName: 'production',
            projectName: 'my-project',
            s3DataBucket: s3Base.bucket,
            cdcPath: 'source-database/',
            useAutomaticSchemaDiscovery: true,
        });

        // 4. Create DMS replication
        new SourceDatabaseDms(this, 'SourceDatabaseDms', {
            envName: 'production',
            projectName: 'my-project',
            s3Base,
            databaseName: 'mydb',
            secretsManagerSecretId: 'my-db-secret',
            replicationConfigIdentifier: 'source-database-production',
            vpcTag: 'main-vpc',
            dmsVpcRole,
            dmsCloudWatchLogsRole,
            dmsSecretsAccessRole,
            glueCatalog,
        });

        // 5. Create Athena workgroup
        const athenaSource = new AthenaSource(this, 'AthenaSource', {
            envName: 'production',
            projectName: 'my-project',
            glueDatabase: glueCatalog.getDatabaseName(),
            s3DataBucket: s3Base.bucket,
        });

        // 6. Create Athena user role
        athenaSource.createAthenaUserRole('DataAnalyst');
    }
}
Enter fullscreen mode Exit fullscreen mode

Configuration Details

Table Selection

Define which tables to replicate in a constants file:

export const selectedTables = [
    'users',
    'orders',
    'products',
    'customers',
    // ... more tables
];

export const cdcPath = '<instance-name>-database/';
Enter fullscreen mode Exit fullscreen mode

PostgreSQL Configuration

Ensure your PostgreSQL database has these settings:

-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;

-- Restart PostgreSQL after making these changes
Enter fullscreen mode Exit fullscreen mode

That setup needs to be done through AWS Parameter Group if you're AWS RDS.

Secrets Manager Format

Your database secret should have this structure:

{
  "username": "dbuser",
  "password": "securepassword",
  "host": "mydb.region.rds.amazonaws.com",
  "port": 5432,
  "dbname": "mydb"
}
Enter fullscreen mode Exit fullscreen mode

Key Features & Best Practices

1. Serverless DMS

We use DMS Serverless with auto-scaling:

  • Min capacity: 2 DCU
  • Max capacity: 4 DCU
  • Multi-AZ for high availability

2. Parquet Format

DMS writes data in Parquet format with:

  • GZIP compression for storage efficiency
  • Operation column (Op) for CDC tracking (I=Insert, U=Update, D=Delete)
  • Timestamp column (__dms_ts) for change tracking

3. S3 Structure

Data is organized as:

s3://bucket/rds-cdc/public/tablename/
  ├── LOAD00000001.parquet
  ├── 20231001-12345.parquet
  └── 20231002-67890.parquet
Enter fullscreen mode Exit fullscreen mode

I recommend you organize like that, so you can use the auto-discovery of Glue Crawlers.

4. Glue Crawler

The crawler runs automatically:

  • Production: Daily at 2 AM (you can change this according your case)
  • Automatically discovers new tables and schema changes
  • Updates the Glue Data Catalog

5. Security

Multiple layers of security:

  • VPC endpoints for private connectivity
  • IAM roles with least privilege
  • S3 bucket encryption (SSE-S3)
  • SSL/TLS for database connections
  • Secrets Manager for credentials

Querying Data with Athena

Once deployed, on Athena, query your data using SQL:

-- View recent changes
SELECT *
FROM my_project_production_source_db.users
WHERE __dms_ts > current_timestamp - interval '1' day
ORDER BY __dms_ts DESC;

-- Get current state (latest record for each ID)
WITH latest_records AS (
  SELECT *, 
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY __dms_ts DESC) as rn
  FROM my_project_production_source_db.users
)
SELECT *
FROM latest_records
WHERE rn = 1 AND Op != 'D'; -- Exclude deleted records
Enter fullscreen mode Exit fullscreen mode

Cost Optimization

  1. DMS Serverless: Only pay for active replication time
  2. S3 Lifecycle: Automatically delete query results after 30 days
  3. Parquet + Gzip: Reduces storage by ~70% compared to CSV
  4. Athena Query Limits: 1GB scan limit per query prevents runaway costs

Monitoring

CloudWatch provides comprehensive monitoring:

  • DMS replication lag
  • S3 bucket metrics
  • Glue crawler runs
  • Athena query metrics

Create alarms for:

new cloudwatch.Alarm(this, 'DmsLatencyAlarm', {
    metric: replicationConfig.metricReplicationLatency(),
    threshold: 300, // 5 minutes
    evaluationPeriods: 2,
});
Enter fullscreen mode Exit fullscreen mode

Deployment

Deploy your stack:

# Bootstrap CDK (first time only)
cdk bootstrap

# Synthesize CloudFormation template
cdk synth

# Deploy
cdk deploy DataLakeStack (you can change)

# After deployment, start the DMS replication in AWS Console
# or use AWS CLI:
aws dms start-replication \
    --replication-config-arn <arn> \
    --start-replication-type start-replication
Enter fullscreen mode Exit fullscreen mode

Common Issues & Solutions

Issue 1: DMS Can't Connect to PostgreSQL

Solution: Ensure RDS security group allows inbound from DMS security group on port 5432.

Issue 2: Tables Not Appearing in Glue

Solution: Run the Glue crawler manually first, then check CloudWatch logs.

Issue 3: Athena Queries Return No Results

Solution: Verify S3 path in Glue table matches actual data location.

Issue 4: DMS Replication Fails

Solution: Check that PostgreSQL has wal_level=logical and sufficient replication slots.

Conclusion

This architecture provides a robust, scalable solution for replicating PostgreSQL data to a data lake. The combination of DMS, Glue, and Athena creates a powerful analytics platform that:

  • Scales automatically with your data
  • Provides near real-time data availability
  • Supports complex analytical queries
  • Maintains cost efficiency
  • Offers production-grade reliability

The modular CDK design makes it easy to extend with additional databases, customize table selection, or add data transformations.

Resources


This architecture has been battle-tested in production environments handling millions of records daily. Feel free to adapt it to your specific needs!

Top comments (0)