Amazon Relational Database Service (Amazon RDS) is a managed database solution providing various engine options such as MySQL, MariaDB and PostgreSQL. With the growing concern on data protection, Amazon RDS supports encryption so that data stored at rest in the underlying storage is encrypted using the keys managed in AWS Key Management Service (KMS).
While an RDS instance can be encrypted at creation, the encryption cannot be enabled after the instance is created. The workaround is to create another encrypted RDS instance, but the process involves a few manual actions which may be error-prone. In this article, we will go through an equivalent automation that leverages AWS Systems Manager (SSM). This creates an encrypted copy of an unencrypted RDS instance.
Note: There are some DB instance classes that do not support Amazon RDS encryption. For details, please refer to https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Overview.Encryption.html#Overview.Encryption.Availability.
Encrypting a RDS instance
Create an unencrypted snapshot of the unencrypted RDS instance.
Create an encrypted copy of the unencrypted snapshot.
Restore an encrypted RDS instance from the encrypted snapshot.
Writing a custom SSM Automation document
A SSM document comprises of some top-level data elements, such as schemaVersion
, parameters
and mainSteps
.
schemaVersion
Specifically, Automation documents must use schema version 0.3
.
parameters
-
DBInstanceResourceId
orDBInstanceIdentifier
: Resource ID or identifier of the DB instance to be encrypted -
DBSnapshotIdentifier
: Identifier for the unencrypted DB snapshot -
EncryptedDBSnapshotIdentifier
: Identifier for the encrypted DB snapshot -
KmsKeyId
: ID, ARN or Alias of the AWS KMS Customer Master Key (CMK) -
InstanceTags
: Tags to be added to the DB instance -
SnapshotTags
: Tags to be added to the DB snapshot -
AutomationAssumeRole
: ARN of the role that allows Automation to perform the actions
mainSteps
-
Get the identifier of the unencrypted RDS instance.
This step exists to cater for the integration with AWS Config mentioned in the upcoming Advanced usage section.
import boto3 def handler(event, context): resource_id = event["DBInstanceResourceId"] instance_id = event["DBInstanceIdentifier"] if instance_id == "": rds_client = boto3.client("rds") instances = rds_client.describe_db_instances(Filters=[{ "Name": "dbi-resource-id", "Values": [resource_id] }]).get("DBInstances", [{}]) instance_id = instances[0].get("DBInstanceIdentifier", "") return {"instance_id": instance_id}
-
Create an unencrypted snapshot of the unencrypted RDS instance.
import json import re from datetime import datetime import boto3 DB_INSTANCE_ID = "DBInstanceIdentifier" INSTANCE_TAGS = "InstanceTags" SNAPSHOT_ARN = "DBSnapshotArn" SNAPSHOT_ID = "DBSnapshotIdentifier" SNAPSHOT_TAGS = "SnapshotTags" PH_DATE = "date" PH_DATETIME = "datetime" PH_EXECUTION_ID = "execution-id" PH_INSTANCE_ID = "db-instance-id" PH_SNAPSHOT_ID = "db-snapshot-id" PH_TIME = "time" PH_TAG_VAL_STR = "{{{}}}" TAG_SHORTCUT_EXPR = "Key=(.+),\s*Value=(.*)" def parse_tags(tags_str): if re.match("({};?)+".format(TAG_SHORTCUT_EXPR), tags_str): matches = [re.match(TAG_SHORTCUT_EXPR, t.strip()) for t in tags_str.split(";")] return [{"Key": m.group(1), "Value": m.group(2) if m.lastindex > 1 else ""} for m in matches] else: return json.loads(tags_str) def build_tags(tag_str, context, tag_vars=None): if tag_str == "": return [] placeholders = tag_data(ctx=context, tag_vars=tag_vars) tags = parse_tags(tag_str) for tag in tags: value = tag.get("Value") for p in placeholders: value = value.replace(PH_TAG_VAL_STR.format(p), str(placeholders[p])) tag["Value"] = value return tags def template_string(s, context, str_vars=None): result = s data = tag_data(ctx=context, tag_vars=str_vars) for p in data: result = result.replace(PH_TAG_VAL_STR.format(p), str(data[p])) return result def tag_data(ctx, tag_vars): def clean(s): return s.replace(":", "").replace("-", "").replace("T", "") dt = datetime.now().replace(microsecond=0) data = { PH_DATETIME: clean(dt.isoformat()), PH_DATE: clean(dt.date().isoformat()), PH_TIME: clean(dt.time().isoformat()), PH_EXECUTION_ID: ctx.get("automation:EXECUTION_ID") } if tag_vars is not None: for t in tag_vars: data[t] = tag_vars[t] return data def handler(event, context): client = boto3.client("rds") inst_id = event[DB_INSTANCE_ID] snapshot_str = event.get(SNAPSHOT_ID, "").strip() if snapshot_str == "": snapshot_str = "{db-instance-id}-{datetime}" tag_vars = { PH_INSTANCE_ID: inst_id, SNAPSHOT_ID: "" } snapshot_id = template_string(snapshot_str, context, tag_vars) args = { DB_INSTANCE_ID: inst_id, SNAPSHOT_ID: snapshot_id } response = client.create_db_snapshot(**args) snapshot_arn = response["DBSnapshot"]["DBSnapshotArn"] snapshot_tag_str = event.get(SNAPSHOT_TAGS, "") if len(snapshot_tag_str) > 0: snapshot_tags = build_tags(snapshot_tag_str, context, tag_vars) if len(snapshot_tags) > 0: client.add_tags_to_resource(ResourceName=snapshot_arn, Tags=snapshot_tags) instance_tag_str = event.get(INSTANCE_TAGS, "") if len(instance_tag_str) > 0: tag_vars[PH_SNAPSHOT_ID] = snapshot_id instance_tags = build_tags(instance_tag_str, context, tag_vars) if len(instance_tags) > 0: db_arn = ":".join(snapshot_arn.split(":")[0:5]) + ":db:" + inst_id client.add_tags_to_resource(ResourceName=db_arn, Tags=instance_tags) return {"snapshot_id" : snapshot_id}
-
Verify that the unencrypted snapshot created in the previous step exists.
This step is necessary because it takes time for the snapshot to become ready for use.
import boto3 import time rds_client = boto3.client("rds") def handler(event, context): snapshot_id = event["DBSnapshotIdentifier"] while True: try: snapshots = rds_client.describe_db_snapshots(DBSnapshotIdentifier=snapshot_id).get("DBSnapshots", [{}]) if snapshots[0].get("Status", "") == "available": return time.sleep(20) except Exception as e: print(e) time.sleep(20) pass
-
Copy the unencrypted snapshot to an encrypted snapshot.
import boto3 from datetime import datetime def handler(event, context): SOURCE_SNAPSHOT_ID = event["DBSnapshotIdentifier"] DEST_SNAPSHOT_ID = event["EncryptedDBSnapshotIdentifier"] if event["EncryptedDBSnapshotIdentifier"] == "": DEST_SNAPSHOT_ID = event["DBSnapshotIdentifier"] + "-encrypted" kmskey_id = event["KmsKeyId"] if event["KmsKeyId"] == "": kmskey_id = "alias/aws/rds" client = boto3.client("rds") response = client.copy_db_snapshot( SourceDBSnapshotIdentifier=SOURCE_SNAPSHOT_ID, TargetDBSnapshotIdentifier=DEST_SNAPSHOT_ID, KmsKeyId=kmskey_id, CopyTags=True, ) snapshot_id = response["DBSnapshot"]["DBSnapshotIdentifier"] return {"snapshot_id" : snapshot_id}
-
Verify that the encrypted snapshot created in the previous step exists.
Similar to step 3, this step is necessary because it takes time for the snapshot to become ready for use.
import boto3 import time rds_client = boto3.client("rds") def handler(event, context): snapshot_id = event["EncryptedDBSnapshotIdentifier"] while True: try: snapshots = rds_client.describe_db_snapshots(DBSnapshotIdentifier = snapshot_id).get("DBSnapshots", [{}]) if snapshots[0].get("Status", "") == "available" and snapshots[0].get("Encrypted", False) == True: return time.sleep(20) except Exception as e: print(e) time.sleep(20) pass
-
Delete the unencrypted snapshot.
This step is optional during the encryption, but doing so helps remove redundant resource and hence saves the cost.
import boto3 import time rds_client = boto3.client("rds") def handler(event, context): snapshot_id = event["DBSnapshotIdentifier"] wait_period = 5 retries = 5 while True: try: rds_client.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id) return True except Exception as ex: # As the list of snapshot is eventually consistent old snapshots might appear in listed snapshots if getattr(ex, "response", {}).get("Error", {}).get("Code", "") == "InvalidSnapshot.NotFound": return False # Throttling might occur when deleting snapshots too fast if "throttling" in ex.message.lower(): retries -= 1 if retries == 0: raise ex time.sleep(wait_period) wait_period = min(wait_period + 10 , 30) continue raise ex
-
Rename the unencrypted RDS instance.
This step can be achieved by executing the
ModifyDBInstance
API. The purpose is to allow the encrypted RDS instance to be created with the same identifier as the old unencrypted RDS instance, so that the connection(s) from other clients can be kept unchanged. -
Verify that the unencrypted RDS instance is renamed in the previous step.
This step is necessary because it takes time for the DB instance to reflect changes.
import boto3 import time rds_client = boto3.client("rds") def handler(event, context): instance_id = event["RenamedDBInstanceIdentifier"] while True: try: instances = rds_client.describe_db_instances(DBInstanceIdentifier=instance_id).get("DBInstances", [{}]) if instances[0].get("DBInstanceStatus", "") == "available": return time.sleep(20) except Exception as e: print(e) time.sleep(20) pass
-
Restore an encrypted RDS instance from the encrypted snapshot.
This step can be achieved by executing the
RestoreDBInstanceFromDBSnapshot
API. -
Verify that the encrypted RDS instance created in the previous step exists.
This step is necessary because it takes time to create the RDS instance.
import boto3 import time rds_client = boto3.client("rds") def handler(event, context): instance_id = event["EncryptedDBInstanceIdentifier"] while True: try: instances = rds_client.describe_db_instances(DBInstanceIdentifier = instance_id).get("DBInstances", [{}]) if instances[0].get("StorageEncrypted", False) == True: return time.sleep(20) except Exception as e: print(e) time.sleep(20) pass
The complete SSM document be found in https://github.com/tiksangng/aws-community-resources/blob/main/config-remediation/rds-encryption/EncryptRDSInstance.yaml. For higher readability and future maintenance, some descriptions are supplemented in every parameter and intermediate step.
Creating a custom SSM Automation document
In the AWS Systems Manager console, select Documents in the navigation pane, click Create document and choose Automation.
For Name, enter
EncryptRDSInstance
. Switch to the Editor tab, click Edit and replace the content with the above document. Click Create automation.
Executing the SSM Automation
To test for the automation, an unencrypted RDS instance named rds-without-encryption
has been prepared in advance.
On the Documents page of the AWS Systems Manager console, switch to the Owned by me tab and click
EncryptRDSInstance
.
In the Input parameters section, specify the
DBInstanceIdentifier
andAutomationAssumeRole
. Optionally, you can specify other parameters as well. Click Execute.
The execution will be completed after a few minutes. The Overall status should show Success.
From the RDS Management Console, you can also see the original unencrypted DB instance rds-without-encryption
is now renamed to rds-without-encryption-unencrypted
, and there is a new encrypted DB instance created with the same name rds-without-encryption
as the original unencrypted DB instance.
Advanced usage
The above SSM document can serve as the remediation method for the AWS Config rule rds-storage-encrypted
. The entire solution can be deployed with an AWS CloudFormation template, which can be found in https://github.com/tiksangng/aws-community-resources/blob/main/config-remediation/rds-encryption/main.yaml.
Note: The remediation is not set to automatic since the deletion of the original unencrypted RDS instance is not included in the automation.
For a step-by-step walkthrough in deploying AWS Config rule and remediation method, please refer to the following article.
AWS Config Auto Remediation for Configuring S3 Lifecycle Rule
Dickson for AWS Community Builders ・ Jan 22 '23
References
- https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Overview.Encryption.html#Overview.Encryption.Enabling
- https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CreateSnapshot.html
- https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CopySnapshot.html#USER_CopyDBSnapshot
- https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_RestoreFromSnapshot.html#USER_RestoreFromSnapshot.Restoring
- https://docs.aws.amazon.com/systems-manager/latest/userguide/sysman-doc-syntax.html#top-level
Top comments (0)