Need a way to automatically back up your AWS Route53 public DNS zones? Look no further, as a combination of the following AWS products can fit the need:
- Lambda
- Route53
- CloudWatch
- S3
Here's a diagram of the process:
This will execute a Lambda function every 6 hours (or whichever you set the CloudWatch event to). It will use the IAM role to export your Route53 public zones as a CSV & JSON to the S3 bucket of your choice.
- Create a S3 private bucket, as it will be your destination for the backups.
- Set the s3_bucket_name variable to your AWS S3 bucket name.
- Set the s3_bucket_region variable to your AWS S3 region.
- Create an IAM role with an attached policy for Route53 read-only and S3 read/write to your S3 Bucket. I have provided an example here.
- Create a CloudWatch event for every 6 hours (or desired recurring duration).
- Upload the below Lambda Python function (copy and save it as aws_s3_route53.py for example).
- Assign the execution role to the IAM role created in step 4, and use the scheduled CloudWatch event trigger created in step 5.
- Check the S3 bucket for your backups and verify.
Now you can sleep a bit more peacefully knowing that you when\if you blow out a record-set in your hosted public zone, you'll have a backup!
aws_s3_route53_backups.py
"""AWS Route 53 Lambda Backup"""
import os
import csv
import json
import time
from datetime import datetime
import boto3
from botocore.exceptions import ClientError
# Set environmental variables
s3_bucket_name = ''
s3_bucket_region = ''
try:
s3_bucket_name = os.environ['s3_bucket_name']
s3_bucket_region = os.environ['s3_bucket_region']
except KeyError as e:
print("Warning: Environmental variable(s) not defined")
# Create client objects
s3 = boto3.client('s3', region_name='us-east-1')
route53 = boto3.client('route53')
# Functions
def create_s3_bucket(bucket_name, bucket_region='us-east-1'):
"""Create an Amazon S3 bucket."""
try:
response = s3.head_bucket(Bucket=bucket_name)
return response
except ClientError as e:
if(e.response['Error']['Code'] != '404'):
print(e)
return None
# creating bucket in us-east-1 (N. Virginia) requires
# no CreateBucketConfiguration parameter be passed
if(bucket_region == 'us-east-1'):
response = s3.create_bucket(
ACL='private',
Bucket=bucket_name
)
else:
response = s3.create_bucket(
ACL='private',
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': bucket_region
}
)
return response
def upload_to_s3(folder, filename, bucket_name, key):
"""Upload a file to a folder in an Amazon S3 bucket."""
key = folder + '/' + key
s3.upload_file(filename, bucket_name, key)
def get_route53_hosted_zones(next_zone=None):
"""Recursively returns a list of hosted zones in Amazon Route 53."""
if(next_zone):
response = route53.list_hosted_zones_by_name(
DNSName=next_zone[0],
HostedZoneId=next_zone[1]
)
else:
response = route53.list_hosted_zones_by_name()
hosted_zones = response['HostedZones']
# if response is truncated, call function again with next zone name/id
if(response['IsTruncated']):
hosted_zones += get_route53_hosted_zones(
(response['NextDNSName'],
response['NextHostedZoneId'])
)
return hosted_zones
def get_route53_zone_records(zone_id, next_record=None):
"""Recursively returns a list of records of a hosted zone in Route 53."""
if(next_record):
response = route53.list_resource_record_sets(
HostedZoneId=zone_id,
StartRecordName=next_record[0],
StartRecordType=next_record[1]
)
else:
response = route53.list_resource_record_sets(HostedZoneId=zone_id)
zone_records = response['ResourceRecordSets']
# if response is truncated, call function again with next record name/id
if(response['IsTruncated']):
zone_records += get_route53_zone_records(
zone_id,
(response['NextRecordName'],
response['NextRecordType'])
)
return zone_records
def get_record_value(record):
"""Return a list of values for a hosted zone record."""
# test if record's value is Alias or dict of records
try:
value = [':'.join(
['ALIAS', record['AliasTarget']['HostedZoneId'],
record['AliasTarget']['DNSName']]
)]
except KeyError:
value = []
for v in record['ResourceRecords']:
value.append(v['Value'])
return value
def try_record(test, record):
"""Return a value for a record"""
# test for Key and Type errors
try:
value = record[test]
except KeyError:
value = ''
except TypeError:
value = ''
return value
def write_zone_to_csv(zone, zone_records):
"""Write hosted zone records to a csv file in /tmp/."""
zone_file_name = '/tmp/' + zone['Name'] + 'csv'
# write to csv file with zone name
with open(zone_file_name, 'w', newline='') as csv_file:
writer = csv.writer(csv_file)
# write column headers
writer.writerow([
'NAME', 'TYPE', 'VALUE',
'TTL', 'REGION', 'WEIGHT',
'SETID', 'FAILOVER', 'EVALUATE_HEALTH'
])
# loop through all the records for a given zone
for record in zone_records:
csv_row = [''] * 9
csv_row[0] = record['Name']
csv_row[1] = record['Type']
csv_row[3] = try_record('TTL', record)
csv_row[4] = try_record('Region', record)
csv_row[5] = try_record('Weight', record)
csv_row[6] = try_record('SetIdentifier', record)
csv_row[7] = try_record('Failover', record)
csv_row[8] = try_record('EvaluateTargetHealth',
try_record('AliasTarget', record)
)
value = get_record_value(record)
# if multiple values (e.g., MX records), write each as its own row
for v in value:
csv_row[2] = v
writer.writerow(csv_row)
return zone_file_name
def write_zone_to_json(zone, zone_records):
"""Write hosted zone records to a json file in /tmp/."""
zone_file_name = '/tmp/' + zone['Name'] + 'json'
# write to json file with zone name
with open(zone_file_name, 'w') as json_file:
json.dump(zone_records, json_file, indent=4)
return zone_file_name
## HANDLER FUNCTION ##
def lambda_handler(event, context):
"""Handler function for AWS Lambda"""
time_stamp = time.strftime("%Y-%m-%dT%H:%M:%SZ",
datetime.utcnow().utctimetuple()
)
if(not create_s3_bucket(s3_bucket_name, s3_bucket_region)):
return False
#bucket_response = create_s3_bucket(s3_bucket_name, s3_bucket_region)
#if(not bucket_response):
#return False
hosted_zones = get_route53_hosted_zones()
for zone in hosted_zones:
zone_folder = (time_stamp + '/' + zone['Name'][:-1])
zone_records = get_route53_zone_records(zone['Id'])
upload_to_s3(
zone_folder,
write_zone_to_csv(zone, zone_records),
s3_bucket_name,
(zone['Name'] + 'csv')
)
upload_to_s3(
zone_folder,
write_zone_to_json(zone, zone_records),
s3_bucket_name,
(zone['Name'] + 'json')
)
return True
if __name__ == "__main__":
lambda_handler(0, 0)
Top comments (3)
But what if 2 DNS zones share the same name as one is private and the other happens to be public? Will the py get information of both? How will it save 2 files (folders) with the same name in S3?
Thanks,
Could someone provide an example of the "environmental variables"? I've tried with what I assume are correct for my conditions, but it refuses them, even though the user account can access both the S3 buckets and Route53.
s3_bucket_name = 'test-route53'
s3_bucket_region = 'us-east-1'
Your paste link for the IAM policy does not work.