Introduction
When building modern data platforms, one of the most common challenges is replicating relational database data into a data lake for analytics. AWS Database Migration Service (DMS) provides a powerful solution for continuous data replication from PostgreSQL to S3, complete with Change Data Capture (CDC) capabilities.
In this article, I'll show you how to build a production-ready data pipeline using AWS CDK (TypeScript) that:
- Replicates PostgreSQL data to S3 in Parquet format
- Automatically catalogs tables using AWS Glue
- Enables SQL queries via Amazon Athena
- Supports both full load and ongoing CDC
Architecture Overview
Our solution follows a layered architecture pattern:
PostgreSQL (RDS)
↓
AWS DMS (Serverless)
↓
S3 Data Lake (Parquet)
↓
AWS Glue Catalog
↓
Amazon Athena
Key Components
- DMS Replication: Handles full load and CDC from PostgreSQL
- S3 Data Lake: Stores data in compressed Parquet format
- Glue Data Catalog: Provides schema discovery and metadata management
- Athena Workgroups: Enables SQL queries on the data lake
Prerequisites
Before starting, ensure you have:
- AWS CDK installed (
npm install -g aws-cdk
) - An existing VPC with private subnets
- A PostgreSQL database with logical replication enabled
- Database credentials stored in AWS Secrets Manager
- TypeScript knowledge
Implementation
Step 1: Base S3 Construct
First, we create a secure S3 bucket for our data lake:
import { Construct } from 'constructs';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as iam from 'aws-cdk-lib/aws-iam';
import { RemovalPolicy } from 'aws-cdk-lib';
export interface S3BaseProps {
bucketName?: string;
removalPolicy?: RemovalPolicy;
iamRolesProps?: iam.RoleProps[];
}
export class S3Base extends Construct {
public readonly bucket: s3.Bucket;
public readonly iamRoles: iam.Role[];
constructor(scope: Construct, id: string, props: S3BaseProps = {}) {
super(scope, id);
this.iamRoles = [];
this.bucket = new s3.Bucket(this, `instance-${id}`, {
bucketName: props.bucketName,
versioned: true,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
encryption: s3.BucketEncryption.S3_MANAGED,
enforceSSL: true,
removalPolicy: props.removalPolicy ?? RemovalPolicy.RETAIN,
autoDeleteObjects: false,
});
// Create IAM roles for DMS access
if (props.iamRolesProps && props.iamRolesProps.length > 0) {
props.iamRolesProps.forEach((iamRolesProp, index) => {
const role = new iam.Role(this, `bucket-${id}-${index}-role`, iamRolesProp);
this.bucket.grantReadWrite(role);
this.iamRoles.push(role);
});
}
}
}
Step 2: DMS Base Construct
The DMS construct handles all replication configuration:
export interface DmsBaseProps extends IProps {
vpcTag: string;
subnetsIds?: string[];
securityGroup?: ec2.ISecurityGroup;
s3Base: S3Base;
databaseName: string;
port?: number;
secretsManagerSecretId: string;
minCapacityUnits?: number;
maxCapacityUnits?: number;
multiAz?: boolean;
dmsVpcRole?: iam.Role;
dmsCloudWatchLogsRole?: iam.Role;
dmsSecretsAccessRole?: iam.Role;
}
export class DmsBase extends Construct {
public readonly sourceEndpoint: dms.CfnEndpoint;
public readonly targetEndpoint: dms.CfnEndpoint;
public readonly replicationConfig: dms.CfnReplicationConfig;
constructor(scope: Construct, id: string, props: DmsBaseProps) {
super(scope, id);
// Lookup VPC
const vpc = ec2.Vpc.fromLookup(this, 'Vpc', {
tags: { Name: props.vpcTag }
});
// Create or use provided IAM roles
const vpcRole = props.dmsVpcRole ?? this.createVpcRole(id);
const secretsAccessRole = props.dmsSecretsAccessRole ??
this.createSecretsAccessRole(id, props.envName);
// Create subnet group
const subnetGroup = this.createSubnetGroup(vpc, props);
// Create security group if not provided
const securityGroup = props.securityGroup ??
this.createSecurityGroup(vpc, id);
// Create source endpoint (PostgreSQL)
this.sourceEndpoint = new dms.CfnEndpoint(this, `${id}-pg-source`, {
endpointIdentifier: `${props.replicationConfigIdentifier}-pg-source`,
endpointType: 'source',
engineName: 'postgres',
databaseName: props.databaseName,
postgreSqlSettings: {
secretsManagerSecretId: props.secretsManagerSecretId,
secretsManagerAccessRoleArn: secretsAccessRole.roleArn,
},
extraConnectionAttributes: [
'pluginName=pgoutput',
'captureDdls=false',
'heartbeatEnable=true',
'heartbeatFrequency=1',
].join(';'),
sslMode: 'require',
});
// Create target endpoint (S3)
this.targetEndpoint = this.createS3TargetEndpoint(props);
// Create replication config (serverless)
this.replicationConfig = this.createReplicationConfig(props);
}
private createS3TargetEndpoint(props: DmsBaseProps): dms.CfnEndpoint {
return new dms.CfnEndpoint(this, 'S3Target', {
endpointIdentifier: `${props.replicationConfigIdentifier}-s3-target`,
endpointType: 'target',
engineName: 's3',
s3Settings: {
serviceAccessRoleArn: props.s3Base.iamRoles[0].roleArn,
bucketName: props.s3Base.bucket.bucketName,
bucketFolder: 'rds-cdc',
dataFormat: 'parquet',
compressionType: 'gzip',
parquetVersion: 'parquet-2-0',
includeOpForFullLoad: true,
cdcInsertsAndUpdates: true,
timestampColumnName: '__dms_ts',
datePartitionEnabled: false,
// Unified file structure for easier querying
cdcPath: 'public',
},
});
}
private createReplicationConfig(props: DmsBaseProps): dms.CfnReplicationConfig {
return new dms.CfnReplicationConfig(this, 'ReplicationConfig', {
replicationConfigIdentifier: props.replicationConfigIdentifier,
replicationType: 'full-load-and-cdc',
sourceEndpointArn: this.sourceEndpoint.ref,
targetEndpointArn: this.targetEndpoint.ref,
computeConfig: {
maxCapacityUnits: props.maxCapacityUnits ?? 4,
minCapacityUnits: props.minCapacityUnits ?? 2,
multiAz: props.multiAz ?? true,
replicationSubnetGroupId: this.subnetGroup.ref,
vpcSecurityGroupIds: [this.securityGroup.securityGroupId],
},
replicationSettings: {
Logging: {
EnableLogging: true,
LogComponents: [
{ Id: 'SOURCE_CAPTURE', Severity: 'LOGGER_SEVERITY_INFO' },
{ Id: 'TARGET_APPLY', Severity: 'LOGGER_SEVERITY_INFO' },
],
},
},
tableMappings: this.createTableMappings(props.selectedTables),
});
}
private createTableMappings(tables: string[]): string {
const rules = tables.map(tableName => ({
'rule-type': 'selection',
'rule-id': tableName,
'rule-name': tableName,
'object-locator': {
'schema-name': 'public',
'table-name': tableName,
},
'rule-action': 'include',
}));
return JSON.stringify({ rules });
}
}
Step 3: Glue Catalog Base
The Glue catalog provides metadata management and schema discovery:
export interface GlueBaseCatalogProps {
envName: string;
projectName: string;
s3DataBucket: s3.Bucket;
cdcPath: string;
catalogSuffix: string;
useGlueCrawler?: boolean;
crawlerSchedule?: string;
}
export abstract class GlueBaseCatalog extends Construct {
public readonly database: glue.CfnDatabase;
public readonly crawler?: glue.CfnCrawler;
protected readonly databaseName: string;
constructor(scope: Construct, id: string, props: GlueBaseCatalogProps) {
super(scope, id);
this.databaseName = `${props.projectName}_${props.envName}_${props.catalogSuffix}`;
// Create Glue Database
this.database = new glue.CfnDatabase(this, 'Database', {
catalogId: Stack.of(this).account,
databaseInput: {
name: this.databaseName,
description: this.getDatabaseDescription(),
},
});
// Create crawler for automatic schema discovery
if (props.useGlueCrawler) {
this.crawler = this.createCrawler(props);
}
}
private createCrawler(props: GlueBaseCatalogProps): glue.CfnCrawler {
const crawlerRole = new iam.Role(this, 'CrawlerRole', {
assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')
],
inlinePolicies: {
S3Access: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
's3:GetObject',
's3:ListBucket',
's3:GetBucketLocation'
],
resources: [
props.s3DataBucket.bucketArn,
`${props.s3DataBucket.bucketArn}/*`
]
})
]
})
}
});
const s3Target = `s3://${props.s3DataBucket.bucketName}/rds-cdc/public/`;
return new glue.CfnCrawler(this, 'Crawler', {
name: `${props.projectName}-${props.envName}-${props.catalogSuffix}-crawler`,
role: crawlerRole.roleArn,
databaseName: this.databaseName,
targets: {
s3Targets: [{
path: s3Target,
exclusions: ['**/_tmp/**', '**/.tmp/**']
}]
},
configuration: JSON.stringify({
Version: 1.0,
CrawlerOutput: {
Partitions: { AddOrUpdateBehavior: 'InheritFromTable' },
Tables: { AddOrUpdateBehavior: 'MergeNewColumns' }
},
Grouping: {
TableGroupingPolicy: 'CombineCompatibleSchemas',
TableLevelConfiguration: 4
}
}),
schemaChangePolicy: {
updateBehavior: 'UPDATE_IN_DATABASE',
deleteBehavior: 'LOG'
},
...(props.crawlerSchedule && {
schedule: {
scheduleExpression: props.crawlerSchedule
}
})
});
}
protected abstract getDatabaseDescription(): string;
protected abstract getTableList(): string[];
public getDatabaseName(): string {
return this.databaseName;
}
}
Step 4: Athena Workgroup
Athena provides SQL query capabilities:
export interface AthenaBaseProps {
envName: string;
projectName: string;
glueDatabase: string;
s3DataBucket: s3.Bucket;
workgroupSuffix: string;
}
export abstract class AthenaBase extends Construct {
public readonly workGroup: athena.CfnWorkGroup;
public readonly queryResultsBucket: s3.Bucket;
constructor(scope: Construct, id: string, props: AthenaBaseProps) {
super(scope, id);
const workgroupName = `${props.projectName}-${props.envName}-${props.workgroupSuffix}`;
// Create S3 bucket for query results
this.queryResultsBucket = new s3.Bucket(this, 'QueryResultsBucket', {
bucketName: `${props.projectName}-athena-${props.workgroupSuffix}-results-${props.envName}`,
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
versioned: true,
lifecycleRules: [{
id: 'DeleteOldQueryResults',
enabled: true,
expiration: Duration.days(30),
}],
});
// Create Athena WorkGroup
this.workGroup = new athena.CfnWorkGroup(this, 'WorkGroup', {
name: workgroupName,
description: this.getWorkGroupDescription(),
state: 'ENABLED',
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${this.queryResultsBucket.bucketName}/query-results/`,
encryptionConfiguration: {
encryptionOption: 'SSE_S3',
},
},
enforceWorkGroupConfiguration: true,
bytesScannedCutoffPerQuery: 1000000000, // 1GB limit
},
});
}
protected abstract getWorkGroupDescription(): string;
public createAthenaUserRole(roleName: string): iam.Role {
return new iam.Role(this, `AthenaUserRole-${roleName}`, {
roleName: `${roleName}-athena-${this.props.workgroupSuffix}-${this.props.envName}`,
assumedBy: new iam.ServicePrincipal('athena.amazonaws.com'),
description: `Role for Athena users to query data`,
inlinePolicies: {
AthenaQueryPolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'athena:StartQueryExecution',
'athena:GetQueryExecution',
'athena:GetQueryResults',
],
resources: [`arn:aws:athena:*:*:workgroup/${this.workGroup.name}`],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'glue:GetDatabase',
'glue:GetTable',
'glue:GetPartitions',
],
resources: ['*'],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['s3:GetObject', 's3:ListBucket'],
resources: [
this.props.s3DataBucket.bucketArn,
`${this.props.s3DataBucket.bucketArn}/*`,
],
}),
],
}),
},
});
}
}
Step 5: Putting It All Together
Here's how to use these constructs in your stack:
export class DataLakeStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// 1. Create required IAM roles (DMS requires specific names)
const dmsVpcRole = new iam.Role(this, 'DmsVpcRole', {
roleName: 'dms-vpc-role', // Required exact name
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal('dms.amazonaws.com'),
new iam.ServicePrincipal('dms.us-east-1.amazonaws.com')
),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonDMSVPCManagementRole')
],
});
const dmsCloudWatchLogsRole = new iam.Role(this, 'DmsCloudWatchLogsRole', {
roleName: 'dms-cloudwatch-logs-role', // Required exact name
assumedBy: new iam.ServicePrincipal('dms.amazonaws.com'),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonDMSCloudWatchLogsRole')
],
});
const dmsSecretsAccessRole = new iam.Role(this, 'DmsSecretsAccessRole', {
roleName: `dms-secrets-access-role-production`,
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal('dms.amazonaws.com'),
new iam.ServicePrincipal('dms.us-east-1.amazonaws.com')
),
inlinePolicies: {
SecretsRead: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['secretsmanager:GetSecretValue'],
resources: ['*'],
}),
],
}),
},
});
// 2. Create S3 data lake
const s3Base = new S3Base(this, 'S3DmsBase', {
bucketName: `my-project-production-dms-data-lake`,
iamRolesProps: [{
assumedBy: new iam.ServicePrincipal('dms.amazonaws.com'),
description: 'Role for DMS to write to S3',
}]
});
// 3. Create Glue Catalog
const glueCatalog = new GlueSourceCatalog(this, 'GlueCatalog', {
envName: 'production',
projectName: 'my-project',
s3DataBucket: s3Base.bucket,
cdcPath: 'source-database/',
useAutomaticSchemaDiscovery: true,
});
// 4. Create DMS replication
new SourceDatabaseDms(this, 'SourceDatabaseDms', {
envName: 'production',
projectName: 'my-project',
s3Base,
databaseName: 'mydb',
secretsManagerSecretId: 'my-db-secret',
replicationConfigIdentifier: 'source-database-production',
vpcTag: 'main-vpc',
dmsVpcRole,
dmsCloudWatchLogsRole,
dmsSecretsAccessRole,
glueCatalog,
});
// 5. Create Athena workgroup
const athenaSource = new AthenaSource(this, 'AthenaSource', {
envName: 'production',
projectName: 'my-project',
glueDatabase: glueCatalog.getDatabaseName(),
s3DataBucket: s3Base.bucket,
});
// 6. Create Athena user role
athenaSource.createAthenaUserRole('DataAnalyst');
}
}
Configuration Details
Table Selection
Define which tables to replicate in a constants file:
export const selectedTables = [
'users',
'orders',
'products',
'customers',
// ... more tables
];
export const cdcPath = '<instance-name>-database/';
PostgreSQL Configuration
Ensure your PostgreSQL database has these settings:
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
-- Restart PostgreSQL after making these changes
That setup needs to be done through AWS Parameter Group if you're AWS RDS.
Secrets Manager Format
Your database secret should have this structure:
{
"username": "dbuser",
"password": "securepassword",
"host": "mydb.region.rds.amazonaws.com",
"port": 5432,
"dbname": "mydb"
}
Key Features & Best Practices
1. Serverless DMS
We use DMS Serverless with auto-scaling:
- Min capacity: 2 DCU
- Max capacity: 4 DCU
- Multi-AZ for high availability
2. Parquet Format
DMS writes data in Parquet format with:
- GZIP compression for storage efficiency
- Operation column (
Op
) for CDC tracking (I=Insert, U=Update, D=Delete) - Timestamp column (
__dms_ts
) for change tracking
3. S3 Structure
Data is organized as:
s3://bucket/rds-cdc/public/tablename/
├── LOAD00000001.parquet
├── 20231001-12345.parquet
└── 20231002-67890.parquet
I recommend you organize like that, so you can use the auto-discovery of Glue Crawlers.
4. Glue Crawler
The crawler runs automatically:
- Production: Daily at 2 AM (you can change this according your case)
- Automatically discovers new tables and schema changes
- Updates the Glue Data Catalog
5. Security
Multiple layers of security:
- VPC endpoints for private connectivity
- IAM roles with least privilege
- S3 bucket encryption (SSE-S3)
- SSL/TLS for database connections
- Secrets Manager for credentials
Querying Data with Athena
Once deployed, on Athena, query your data using SQL:
-- View recent changes
SELECT *
FROM my_project_production_source_db.users
WHERE __dms_ts > current_timestamp - interval '1' day
ORDER BY __dms_ts DESC;
-- Get current state (latest record for each ID)
WITH latest_records AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY __dms_ts DESC) as rn
FROM my_project_production_source_db.users
)
SELECT *
FROM latest_records
WHERE rn = 1 AND Op != 'D'; -- Exclude deleted records
Cost Optimization
- DMS Serverless: Only pay for active replication time
- S3 Lifecycle: Automatically delete query results after 30 days
- Parquet + Gzip: Reduces storage by ~70% compared to CSV
- Athena Query Limits: 1GB scan limit per query prevents runaway costs
Monitoring
CloudWatch provides comprehensive monitoring:
- DMS replication lag
- S3 bucket metrics
- Glue crawler runs
- Athena query metrics
Create alarms for:
new cloudwatch.Alarm(this, 'DmsLatencyAlarm', {
metric: replicationConfig.metricReplicationLatency(),
threshold: 300, // 5 minutes
evaluationPeriods: 2,
});
Deployment
Deploy your stack:
# Bootstrap CDK (first time only)
cdk bootstrap
# Synthesize CloudFormation template
cdk synth
# Deploy
cdk deploy DataLakeStack (you can change)
# After deployment, start the DMS replication in AWS Console
# or use AWS CLI:
aws dms start-replication \
--replication-config-arn <arn> \
--start-replication-type start-replication
Common Issues & Solutions
Issue 1: DMS Can't Connect to PostgreSQL
Solution: Ensure RDS security group allows inbound from DMS security group on port 5432.
Issue 2: Tables Not Appearing in Glue
Solution: Run the Glue crawler manually first, then check CloudWatch logs.
Issue 3: Athena Queries Return No Results
Solution: Verify S3 path in Glue table matches actual data location.
Issue 4: DMS Replication Fails
Solution: Check that PostgreSQL has wal_level=logical
and sufficient replication slots.
Conclusion
This architecture provides a robust, scalable solution for replicating PostgreSQL data to a data lake. The combination of DMS, Glue, and Athena creates a powerful analytics platform that:
- Scales automatically with your data
- Provides near real-time data availability
- Supports complex analytical queries
- Maintains cost efficiency
- Offers production-grade reliability
The modular CDK design makes it easy to extend with additional databases, customize table selection, or add data transformations.
Resources
This architecture has been battle-tested in production environments handling millions of records daily. Feel free to adapt it to your specific needs!
Top comments (0)