Need a way to automatically back up your AWS Route53 public DNS zones? Look no further, as a combination of the following AWS products can fit the need:
- Lambda
- Route53
- CloudWatch
- S3
Here's a diagram of the process:
This will execute a Lambda function every 6 hours (or whichever you set the CloudWatch event to). It will use the IAM role to export your Route53 public zones as a CSV & JSON to the S3 bucket of your choice.
- Create a S3 private bucket, as it will be your destination for the backups.
- Set the s3_bucket_name variable to your AWS S3 bucket name.
- Set the s3_bucket_region variable to your AWS S3 region.
- Create an IAM role with an attached policy for Route53 read-only and S3 read/write to your S3 Bucket. I have provided an example here.
- Create a CloudWatch event for every 6 hours (or desired recurring duration).
- Upload the below Lambda Python function (copy and save it as aws_s3_route53.py for example).
- Assign the execution role to the IAM role created in step 4, and use the scheduled CloudWatch event trigger created in step 5.
- Check the S3 bucket for your backups and verify.
Now you can sleep a bit more peacefully knowing that you when\if you blow out a record-set in your hosted public zone, you'll have a backup!
aws_s3_route53_backups.py
"""AWS Route 53 Lambda Backup"""
import os
import csv
import json
import time
from datetime import datetime
import boto3
from botocore.exceptions import ClientError
# Set environmental variables
s3_bucket_name = ''
s3_bucket_region = ''
try:
s3_bucket_name = os.environ['s3_bucket_name']
s3_bucket_region = os.environ['s3_bucket_region']
except KeyError as e:
print("Warning: Environmental variable(s) not defined")
# Create client objects
s3 = boto3.client('s3', region_name='us-east-1')
route53 = boto3.client('route53')
# Functions
def create_s3_bucket(bucket_name, bucket_region='us-east-1'):
"""Create an Amazon S3 bucket."""
try:
response = s3.head_bucket(Bucket=bucket_name)
return response
except ClientError as e:
if(e.response['Error']['Code'] != '404'):
print(e)
return None
# creating bucket in us-east-1 (N. Virginia) requires
# no CreateBucketConfiguration parameter be passed
if(bucket_region == 'us-east-1'):
response = s3.create_bucket(
ACL='private',
Bucket=bucket_name
)
else:
response = s3.create_bucket(
ACL='private',
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': bucket_region
}
)
return response
def upload_to_s3(folder, filename, bucket_name, key):
"""Upload a file to a folder in an Amazon S3 bucket."""
key = folder + '/' + key
s3.upload_file(filename, bucket_name, key)
def get_route53_hosted_zones(next_zone=None):
"""Recursively returns a list of hosted zones in Amazon Route 53."""
if(next_zone):
response = route53.list_hosted_zones_by_name(
DNSName=next_zone[0],
HostedZoneId=next_zone[1]
)
else:
response = route53.list_hosted_zones_by_name()
hosted_zones = response['HostedZones']
# if response is truncated, call function again with next zone name/id
if(response['IsTruncated']):
hosted_zones += get_route53_hosted_zones(
(response['NextDNSName'],
response['NextHostedZoneId'])
)
return hosted_zones
def get_route53_zone_records(zone_id, next_record=None):
"""Recursively returns a list of records of a hosted zone in Route 53."""
if(next_record):
response = route53.list_resource_record_sets(
HostedZoneId=zone_id,
StartRecordName=next_record[0],
StartRecordType=next_record[1]
)
else:
response = route53.list_resource_record_sets(HostedZoneId=zone_id)
zone_records = response['ResourceRecordSets']
# if response is truncated, call function again with next record name/id
if(response['IsTruncated']):
zone_records += get_route53_zone_records(
zone_id,
(response['NextRecordName'],
response['NextRecordType'])
)
return zone_records
def get_record_value(record):
"""Return a list of values for a hosted zone record."""
# test if record's value is Alias or dict of records
try:
value = [':'.join(
['ALIAS', record['AliasTarget']['HostedZoneId'],
record['AliasTarget']['DNSName']]
)]
except KeyError:
value = []
for v in record['ResourceRecords']:
value.append(v['Value'])
return value
def try_record(test, record):
"""Return a value for a record"""
# test for Key and Type errors
try:
value = record[test]
except KeyError:
value = ''
except TypeError:
value = ''
return value
def write_zone_to_csv(zone, zone_records):
"""Write hosted zone records to a csv file in /tmp/."""
zone_file_name = '/tmp/' + zone['Name'] + 'csv'
# write to csv file with zone name
with open(zone_file_name, 'w', newline='') as csv_file:
writer = csv.writer(csv_file)
# write column headers
writer.writerow([
'NAME', 'TYPE', 'VALUE',
'TTL', 'REGION', 'WEIGHT',
'SETID', 'FAILOVER', 'EVALUATE_HEALTH'
])
# loop through all the records for a given zone
for record in zone_records:
csv_row = [''] * 9
csv_row[0] = record['Name']
csv_row[1] = record['Type']
csv_row[3] = try_record('TTL', record)
csv_row[4] = try_record('Region', record)
csv_row[5] = try_record('Weight', record)
csv_row[6] = try_record('SetIdentifier', record)
csv_row[7] = try_record('Failover', record)
csv_row[8] = try_record('EvaluateTargetHealth',
try_record('AliasTarget', record)
)
value = get_record_value(record)
# if multiple values (e.g., MX records), write each as its own row
for v in value:
csv_row[2] = v
writer.writerow(csv_row)
return zone_file_name
def write_zone_to_json(zone, zone_records):
"""Write hosted zone records to a json file in /tmp/."""
zone_file_name = '/tmp/' + zone['Name'] + 'json'
# write to json file with zone name
with open(zone_file_name, 'w') as json_file:
json.dump(zone_records, json_file, indent=4)
return zone_file_name
## HANDLER FUNCTION ##
def lambda_handler(event, context):
"""Handler function for AWS Lambda"""
time_stamp = time.strftime("%Y-%m-%dT%H:%M:%SZ",
datetime.utcnow().utctimetuple()
)
if(not create_s3_bucket(s3_bucket_name, s3_bucket_region)):
return False
#bucket_response = create_s3_bucket(s3_bucket_name, s3_bucket_region)
#if(not bucket_response):
#return False
hosted_zones = get_route53_hosted_zones()
for zone in hosted_zones:
zone_folder = (time_stamp + '/' + zone['Name'][:-1])
zone_records = get_route53_zone_records(zone['Id'])
upload_to_s3(
zone_folder,
write_zone_to_csv(zone, zone_records),
s3_bucket_name,
(zone['Name'] + 'csv')
)
upload_to_s3(
zone_folder,
write_zone_to_json(zone, zone_records),
s3_bucket_name,
(zone['Name'] + 'json')
)
return True
if __name__ == "__main__":
lambda_handler(0, 0)
Top comments (3)
But what if 2 DNS zones share the same name as one is private and the other happens to be public? Will the py get information of both? How will it save 2 files (folders) with the same name in S3?
Thanks,
Your paste link for the IAM policy does not work.
Could someone provide an example of the "environmental variables"? I've tried with what I assume are correct for my conditions, but it refuses them, even though the user account can access both the S3 buckets and Route53.
s3_bucket_name = 'test-route53'
s3_bucket_region = 'us-east-1'